You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/02 05:00:18 UTC

[44/50] [abbrv] phoenix git commit: PHOENIX-2629 NoClassDef error for BaseDecoder on log replay

PHOENIX-2629 NoClassDef error for BaseDecoder on log replay


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c89903ec
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c89903ec
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c89903ec

Branch: refs/heads/calcite
Commit: c89903ec5c176eb93abe32437b2ac171b6f6c552
Parents: 791a27c
Author: Samarth <sa...@salesforce.com>
Authored: Thu Jan 28 13:30:54 2016 -0800
Committer: Samarth <sa...@salesforce.com>
Committed: Thu Jan 28 15:01:01 2016 -0800

----------------------------------------------------------------------
 .../wal/BinaryCompatibleBaseDecoder.java        | 110 +++++++++++++++++++
 .../regionserver/wal/IndexedWALEditCodec.java   |  83 +++++++++++++-
 2 files changed, 191 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c89903ec/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/BinaryCompatibleBaseDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/BinaryCompatibleBaseDecoder.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/BinaryCompatibleBaseDecoder.java
new file mode 100644
index 0000000..80f2dd2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/BinaryCompatibleBaseDecoder.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+import javax.annotation.Nonnull;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.codec.Codec;
+
+/**
+ * This class is a copy paste version of org.apache.hadoop.hbase.codec.BaseDecoder class. 
+ * This class is meant to be used in {@link IndexedWALEditCodec} when runtime version of
+ * HBase is older than 1.1.3. This is needed to handle binary incompatibility introduced by
+ * HBASE-14501. See PHOENIX-2629 and PHOENIX-2636 for details.
+ */
+public abstract class BinaryCompatibleBaseDecoder implements Codec.Decoder {
+  protected static final Log LOG = LogFactory.getLog(BinaryCompatibleBaseDecoder.class);
+
+  protected final InputStream in;
+  private Cell current = null;
+
+  protected static class PBIS extends PushbackInputStream {
+    public PBIS(InputStream in, int size) {
+      super(in, size);
+    }
+
+    public void resetBuf(int size) {
+      this.buf = new byte[size];
+      this.pos = size;
+    }
+  }
+
+  public BinaryCompatibleBaseDecoder(final InputStream in) {
+    this.in = new PBIS(in, 1);
+  }
+
+  @Override
+  public boolean advance() throws IOException {
+    int firstByte = in.read();
+    if (firstByte == -1) {
+      return false;
+    } else {
+      ((PBIS)in).unread(firstByte);
+    }
+
+    try {
+      this.current = parseCell();
+    } catch (IOException ioEx) {
+      ((PBIS)in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers
+      rethrowEofException(ioEx);
+    }
+    return true;
+  }
+
+  private void rethrowEofException(IOException ioEx) throws IOException {
+    boolean isEof = false;
+    try {
+      isEof = this.in.available() == 0;
+    } catch (Throwable t) {
+      LOG.trace("Error getting available for error message - ignoring", t);
+    }
+    if (!isEof) throw ioEx;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Partial cell read caused by EOF", ioEx);
+    }
+    EOFException eofEx = new EOFException("Partial cell read");
+    eofEx.initCause(ioEx);
+    throw eofEx;
+  }
+
+  protected InputStream getInputStream() {
+    return in;
+  }
+
+  /**
+   * Extract a Cell.
+   * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
+   * thrown if EOF is reached prematurely. Does not return null.
+   * @throws IOException
+   */
+  @Nonnull
+  protected abstract Cell parseCell() throws IOException;
+
+  @Override
+  public Cell current() {
+    return this.current;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c89903ec/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
index 2534b34..1a70e12 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.codec.BaseDecoder;
 import org.apache.hadoop.hbase.codec.BaseEncoder;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
 import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
 
@@ -50,23 +52,27 @@ public class IndexedWALEditCodec extends WALCellCodec {
   // the stream
   private static final int REGULAR_KEY_VALUE_MARKER = 0;
   private CompressionContext compression;
+  private static final int MIN_BINARY_COMPATIBLE_INDEX_CODEC_VERSION = VersionUtil.encodeVersion("1", "1", "3");
+  private final boolean useDefaultDecoder;
 
   public IndexedWALEditCodec(Configuration conf, CompressionContext compression) {
       super(conf, compression);
       this.compression = compression;
+      String hbaseVersion = VersionInfo.getVersion();
+      this.useDefaultDecoder = VersionUtil.encodeVersion(hbaseVersion) >= MIN_BINARY_COMPATIBLE_INDEX_CODEC_VERSION;
   }
 
   @Override
   public Decoder getDecoder(InputStream is) {
     // compression isn't enabled
     if (this.compression == null) {
-      return new IndexKeyValueDecoder(is);
+      return useDefaultDecoder ? new IndexKeyValueDecoder(is) : new BinaryCompatibleIndexKeyValueDecoder(is);
     }
 
     // there is compression, so we get the standard decoder to handle reading those kvs
     Decoder decoder = super.getDecoder(is);
     // compression is on, reqturn our custom decoder
-    return new CompressedIndexKeyValueDecoder(is, decoder);
+    return useDefaultDecoder ? new CompressedIndexKeyValueDecoder(is, decoder) : new BinaryCompatibleCompressedIndexKeyValueDecoder(is, decoder);
   }
 
   @Override
@@ -237,4 +243,77 @@ public class IndexedWALEditCodec extends WALCellCodec {
       }
     }
   }
+  
+  private static abstract class BinaryCompatiblePhoenixBaseDecoder extends BinaryCompatibleBaseDecoder {
+      protected DataInput dataInput;
+      public BinaryCompatiblePhoenixBaseDecoder(InputStream in) {
+        super(in);
+        dataInput = getDataInput(this.in);
+      } 
+  }
+  
+  /**
+   * This class is meant to be used when runtime version of HBase
+   * HBase is older than 1.1.3. This is needed to handle binary incompatibility introduced by
+   * HBASE-14501. See PHOENIX-2629 and PHOENIX-2636 for details.
+   */
+  private static class BinaryCompatibleIndexKeyValueDecoder extends BinaryCompatiblePhoenixBaseDecoder {
+      /**
+       * Create a Decoder on the given input stream with the given Decoder to parse
+       * generic {@link KeyValue}s.
+       * @param is stream to read from
+       */
+      public BinaryCompatibleIndexKeyValueDecoder(InputStream is){
+        super(is);
+      }
+
+      @Override
+      protected KeyValue parseCell() throws IOException{
+        return KeyValueCodec.readKeyValue(this.dataInput);
+      }
+  }
+  
+  /**
+   * This class is meant to be used when runtime version of HBase
+   * HBase is older than 1.1.3. This is needed to handle binary incompatibility introduced by
+   * HBASE-14501. See PHOENIX-2629 and PHOENIX-2636 for details.
+   */
+  private static class BinaryCompatibleCompressedIndexKeyValueDecoder extends BinaryCompatiblePhoenixBaseDecoder {
+
+      private Decoder decoder;
+
+      /**
+       * Create a Decoder on the given input stream with the given Decoder to parse
+       * generic {@link KeyValue}s.
+       * @param is stream to read from
+       * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected
+       *          compression.
+       */
+      public BinaryCompatibleCompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) {
+        super(is);
+        this.decoder = compressedDecoder;
+      }
+
+      @Override
+      protected Cell parseCell() throws IOException {
+        // reader the marker
+        int marker = this.in.read();
+        if (marker < 0) {
+          throw new EOFException(
+              "Unexepcted end of stream found while reading next (Indexed) KeyValue");
+        }
+
+        // do the normal thing, if its a regular kv
+        if (marker == REGULAR_KEY_VALUE_MARKER) {
+          if (!this.decoder.advance()) {
+            throw new IOException("Could not read next key-value from generic KeyValue Decoder!");
+          }
+          return this.decoder.current();
+        }
+
+        // its an indexedKeyValue, so parse it out specially
+        return KeyValueCodec.readKeyValue(this.dataInput);
+      }
+  }
+  
 }
\ No newline at end of file