You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2015/10/13 05:58:24 UTC

hbase git commit: HBASE-14501 NPE in replication with TDE

Repository: hbase
Updated Branches:
  refs/heads/master 6143b7694 -> 2ff6d0fe4


HBASE-14501 NPE in replication with TDE


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2ff6d0fe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2ff6d0fe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2ff6d0fe

Branch: refs/heads/master
Commit: 2ff6d0fe4789857ab51685949711d755dedd459a
Parents: 6143b76
Author: Enis Soztutar <en...@apache.org>
Authored: Mon Oct 12 20:37:34 2015 -0700
Committer: Enis Soztutar <en...@apache.org>
Committed: Mon Oct 12 20:37:34 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/KeyValue.java  |  8 +++--
 .../org/apache/hadoop/hbase/KeyValueUtil.java   | 13 ++++---
 .../apache/hadoop/hbase/codec/BaseDecoder.java  | 38 +++++++++++++++-----
 .../apache/hadoop/hbase/codec/CellCodec.java    |  1 +
 .../regionserver/wal/SecureWALCellCodec.java    |  9 ++---
 .../regionserver/ReplicationSource.java         |  3 +-
 6 files changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
index 3630e9b..7534e9d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.Bytes.len;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -2400,8 +2401,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
    * Create a KeyValue reading from the raw InputStream.
    * Named <code>iscreate</code> so doesn't clash with {@link #create(DataInput)}
    * @param in
-   * @return Created KeyValue OR if we find a length of zero, we will return null which
-   * can be useful marking a stream as done.
+   * @return Created KeyValue or throws an exception
    * @throws IOException
    * {@link Deprecated} As of 1.2. Use {@link KeyValueUtil#iscreate(InputStream, boolean)} instead.
    */
@@ -2412,7 +2412,9 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId,
     while (bytesRead < intBytes.length) {
       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
       if (n < 0) {
-        if (bytesRead == 0) return null; // EOF at start is ok
+        if (bytesRead == 0) {
+          throw new EOFException();
+        }
         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
       }
       bytesRead += n;

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
index 24d88b3..98e2205 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValueUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -187,7 +188,7 @@ public class KeyValueUtil {
    * position to the start of the next KeyValue. Does not allocate a new array or copy data.
    * @param bb
    * @param includesMvccVersion
-   * @param includesTags 
+   * @param includesTags
    */
   public static KeyValue nextShallowCopy(final ByteBuffer bb, final boolean includesMvccVersion,
       boolean includesTags) {
@@ -231,7 +232,7 @@ public class KeyValueUtil {
     return createFirstOnRow(CellUtil.cloneRow(in), CellUtil.cloneFamily(in),
       CellUtil.cloneQualifier(in), in.getTimestamp() - 1);
   }
-  
+
 
   /**
    * Create a KeyValue for the specified row, family and qualifier that would be
@@ -449,6 +450,7 @@ public class KeyValueUtil {
   @Deprecated
   public static List<KeyValue> ensureKeyValues(List<Cell> cells) {
     List<KeyValue> lazyList = Lists.transform(cells, new Function<Cell, KeyValue>() {
+      @Override
       public KeyValue apply(Cell arg0) {
         return KeyValueUtil.ensureKeyValue(arg0);
       }
@@ -491,8 +493,9 @@ public class KeyValueUtil {
     while (bytesRead < intBytes.length) {
       int n = in.read(intBytes, bytesRead, intBytes.length - bytesRead);
       if (n < 0) {
-        if (bytesRead == 0)
-          return null; // EOF at start is ok
+        if (bytesRead == 0) {
+          throw new EOFException();
+        }
         throw new IOException("Failed read of int, read " + bytesRead + " bytes");
       }
       bytesRead += n;
@@ -555,7 +558,7 @@ public class KeyValueUtil {
 
   /**
    * Create a KeyValue reading <code>length</code> from <code>in</code>
-   * 
+   *
    * @param length
    * @param in
    * @return Created KeyValue OR if we find a length of zero, we will return

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
index 51801a8..09dc37f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/BaseDecoder.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.hbase.codec;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+import javax.annotation.Nonnull;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,27 +35,41 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public abstract class BaseDecoder implements Codec.Decoder {
   protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
-  protected final InputStream in;
-  private boolean hasNext = true;
+
+  protected final PBIS in;
   private Cell current = null;
 
+  protected static class PBIS extends PushbackInputStream {
+    public PBIS(InputStream in, int size) {
+      super(in, size);
+    }
+
+    public void resetBuf(int size) {
+      this.buf = new byte[size];
+      this.pos = size;
+    }
+  }
+
   public BaseDecoder(final InputStream in) {
-    this.in = in;
+    this.in = new PBIS(in, 1);
   }
 
   @Override
   public boolean advance() throws IOException {
-    if (!this.hasNext) return this.hasNext;
-    if (this.in.available() == 0) {
-      this.hasNext = false;
-      return this.hasNext;
+    int firstByte = in.read();
+    if (firstByte == -1) {
+      return false;
+    } else {
+      in.unread(firstByte);
     }
+
     try {
       this.current = parseCell();
     } catch (IOException ioEx) {
+      in.resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers
       rethrowEofException(ioEx);
     }
-    return this.hasNext;
+    return true;
   }
 
   private void rethrowEofException(IOException ioEx) throws IOException {
@@ -72,9 +89,12 @@ public abstract class BaseDecoder implements Codec.Decoder {
   }
 
   /**
-   * @return extract a Cell
+   * Extract a Cell.
+   * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
+   * thrown if EOF is reached prematurely. Does not return null.
    * @throws IOException
    */
+  @Nonnull
   protected abstract Cell parseCell() throws IOException;
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
index a54c76e..666f440 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodec.java
@@ -79,6 +79,7 @@ public class CellCodec implements Codec {
       super(in);
     }
 
+    @Override
     protected Cell parseCell() throws IOException {
       byte [] row = readByteArray(this.in);
       byte [] family = readByteArray(in);

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
index 46f3b88..69181e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureWALCellCodec.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -84,12 +83,8 @@ public class SecureWALCellCodec extends WALCellCodec {
         return super.parseCell();
       }
       int ivLength = 0;
-      try {
-        ivLength = StreamUtils.readRawVarint32(in);
-      } catch (EOFException e) {
-        // EOF at start is OK
-        return null;
-      }
+
+      ivLength = StreamUtils.readRawVarint32(in);
 
       // TODO: An IV length of 0 could signify an unwrapped cell, when the
       // encoder supports that just read the remainder in directly

http://git-wip-us.apache.org/repos/asf/hbase/blob/2ff6d0fe/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3fa2ed7..3d99523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -855,9 +855,10 @@ public class ReplicationSource extends Thread
       int distinctRowKeys = 1;
       Cell lastCell = cells.get(0);
       for (int i = 0; i < edit.size(); i++) {
-        if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
+        if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
           distinctRowKeys++;
         }
+        lastCell = cells.get(i);
       }
       return distinctRowKeys;
     }