You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/01/22 08:19:45 UTC

svn commit: r1560265 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-common/src/main/java/org/apach...

Author: anoopsamjohn
Date: Wed Jan 22 07:19:45 2014
New Revision: 1560265

URL: http://svn.apache.org/r1560265
Log:
HBASE-10322 Strip tags from KV while sending back to client on reads.

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java
Removed:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecV2.java
    hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecV2.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java Wed Jan 22 07:19:45 2014
@@ -47,7 +47,8 @@ class HConnectionKey {
       HConstants.HBASE_RPC_TIMEOUT_KEY,
       HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
       HConstants.HBASE_META_SCANNER_CACHING,
-      HConstants.HBASE_CLIENT_INSTANCE_ID };
+      HConstants.HBASE_CLIENT_INSTANCE_ID,
+      HConstants.RPC_CODEC_CONF_KEY };
 
   private Map<String, String> properties;
   private String username;

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Jan 22 07:19:45 2014
@@ -126,7 +126,7 @@ class MultiServerCallable<R> extends Reg
     if (connection == null) return true; // Default is to do cellblocks.
     Configuration configuration = connection.getConfiguration();
     if (configuration == null) return true;
-    String codec = configuration.get("hbase.client.rpc.codec", "");
+    String codec = configuration.get(HConstants.RPC_CODEC_CONF_KEY, "");
     return codec != null && codec.length() > 0;
   }
 

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Wed Jan 22 07:19:45 2014
@@ -147,13 +147,8 @@ public class Put extends Mutation implem
     return addImmutable(family, qualifier, this.ts, value);
   }
 
-  public Put add(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
-    return add(family, qualifier, this.ts, value, tag);
-  }
-
   /**
-   * See {@link #add(byte[], byte[], byte[], Tag[] tag)}. This version expects
-   * that the underlying arrays won't change. It's intended
+   * This expects that the underlying arrays won't change. It's intended
    * for usage internal HBase to and for advanced client applications.
    */
   public Put addImmutable(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
@@ -197,20 +192,7 @@ public class Put extends Mutation implem
   }
 
   /**
-   * Forms a keyvalue with tags
-   */
-  @SuppressWarnings("unchecked")
-  public Put add(byte[] family, byte[] qualifier, long ts, byte[] value, Tag[] tag) {
-    List<Cell> list = getCellList(family);
-    KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
-    list.add(kv);
-    familyMap.put(CellUtil.cloneFamily(kv), list);
-    return this;
-  }
-
-  /**
-   * See {@link #add(byte[], byte[], long, byte[], Tag[] tag)}. This version expects
-   * that the underlying arrays won't change. It's intended
+   * This expects that the underlying arrays won't change. It's intended
    * for usage internal HBase to and for advanced client applications.
    */
   @SuppressWarnings("unchecked")
@@ -223,30 +205,7 @@ public class Put extends Mutation implem
   }
 
   /**
-   * Add the specified column and value, with the specified timestamp as
-   * its version to this Put operation.
-   * @param family family name
-   * @param qualifier column qualifier
-   * @param ts version timestamp
-   * @param value column value
-   * @param tag the tags
-   * @return this
-   */
-  public Put add(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value, Tag[] tag) {
-    if (ts < 0) {
-      throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
-    }
-    List<Cell> list = getCellList(family);
-    KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
-    list.add(kv);
-    familyMap.put(CellUtil.cloneFamily(kv), list);
-    return this;
-  }
-
-
-  /**
-   * See {@link #add(byte[], ByteBuffer, long, ByteBuffer, Tag[] tag)}. This version expects
-   * that the underlying arrays won't change. It's intended
+   * This expects that the underlying arrays won't change. It's intended
    * for usage internal HBase to and for advanced client applications.
    */
   public Put addImmutable(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value,

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Jan 22 07:19:45 2014
@@ -1298,9 +1298,9 @@ public class RpcClient {
    * @return Codec to use on this client.
    */
   Codec getCodec() {
-    // For NO CODEC, "hbase.client.rpc.codec" must be the empty string AND
-    // "hbase.client.default.rpc.codec" -- because default is to do cell block encoding.
-    String className = conf.get("hbase.client.rpc.codec", getDefaultCodec(this.conf));
+    // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
+    // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
+    String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
     if (className == null || className.length() == 0) return null;
     try {
       return (Codec)Class.forName(className).newInstance();

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Jan 22 07:19:45 2014
@@ -675,7 +675,10 @@ public final class ProtobufUtil {
               "Missing required field: qualifer value");
           }
           byte[] value = qv.getValue().toByteArray();
-          byte[] tags = qv.getTags().toByteArray();
+          byte[] tags = null;
+          if (qv.hasTags()) {
+            tags = qv.getTags().toByteArray();
+          }
           append.add(CellUtil.createCell(row, family, qualifier, append.getTimeStamp(),
               KeyValue.Type.Put, value, tags));
         }
@@ -750,7 +753,10 @@ public final class ProtobufUtil {
             throw new DoNotRetryIOException("Missing required field: qualifer value");
           }
           byte[] value = qv.getValue().toByteArray();
-          byte[] tags = qv.getTags().toByteArray();
+          byte[] tags = null;
+          if (qv.hasTags()) {
+            tags = qv.getTags().toByteArray();
+          }
           increment.add(CellUtil.createCell(row, family, qualifier, increment.getTimeStamp(),
               KeyValue.Type.Put, value, tags));
         }

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Jan 22 07:19:45 2014
@@ -973,6 +973,12 @@ public final class HConstants {
   /** Configuration key for enabling HLog encryption, a boolean */
   public static final String ENABLE_WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
 
+  /** Configuration key for setting RPC codec class name */
+  public static final String RPC_CODEC_CONF_KEY = "hbase.client.rpc.codec";
+
+  /** Configuration key for setting replication codec class name */
+  public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Wed Jan 22 07:19:45 2014
@@ -2840,7 +2840,9 @@ public class KeyValue implements Cell, H
    * @see #create(DataInput) for the inverse function
    * @see #write(KeyValue, DataOutput)
    */
-  public static long oswrite(final KeyValue kv, final OutputStream out) throws IOException {
+  @Deprecated
+  public static long oswrite(final KeyValue kv, final OutputStream out)
+      throws IOException {
     int length = kv.getLength();
     // This does same as DataOuput#writeInt (big-endian, etc.)
     out.write(Bytes.toBytes(length));
@@ -2849,6 +2851,30 @@ public class KeyValue implements Cell, H
   }
 
   /**
+   * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
+   * not require a {@link DataOutput}, just take plain {@link OutputStream}
+   * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
+   * @param kv
+   * @param out
+   * @param withTags
+   * @return Length written on stream
+   * @throws IOException
+   * @see #create(DataInput) for the inverse function
+   * @see #write(KeyValue, DataOutput)
+   */
+  public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
+      throws IOException {
+    int length = kv.getLength();
+    if (!withTags) {
+      length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
+    }
+    // This does same as DataOuput#writeInt (big-endian, etc.)
+    out.write(Bytes.toBytes(length));
+    out.write(kv.getBuffer(), kv.getOffset(), length);
+    return length + Bytes.SIZEOF_INT;
+  }
+
+  /**
    * Comparator that compares row component only of a KeyValue.
    */
   public static class RowOnlyComparator implements Comparator<KeyValue> {

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
+ * Uses ints delimiting all lengths. Profligate. Needs tune up.
+ * <b>Use this Codec only at server side.</b>
+ */
+@InterfaceAudience.Private
+public class CellCodecWithTags implements Codec {
+  static class CellEncoder extends BaseEncoder {
+    CellEncoder(final OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void write(Cell cell) throws IOException {
+      checkFlushed();
+      // Row
+      write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+      // Column family
+      write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+      // Qualifier
+      write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+      // Version
+      this.out.write(Bytes.toBytes(cell.getTimestamp()));
+      // Type
+      this.out.write(cell.getTypeByte());
+      // Value
+      write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+      // Tags
+      write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+      // MvccVersion
+      this.out.write(Bytes.toBytes(cell.getMvccVersion()));
+    }
+
+    /**
+     * Write int length followed by array bytes.
+     *
+     * @param bytes
+     * @param offset
+     * @param length
+     * @throws IOException
+     */
+    private void write(final byte[] bytes, final int offset, final int length) throws IOException {
+      this.out.write(Bytes.toBytes(length));
+      this.out.write(bytes, offset, length);
+    }
+  }
+
+  static class CellDecoder extends BaseDecoder {
+    public CellDecoder(final InputStream in) {
+      super(in);
+    }
+
+    protected Cell parseCell() throws IOException {
+      byte[] row = readByteArray(this.in);
+      byte[] family = readByteArray(in);
+      byte[] qualifier = readByteArray(in);
+      byte[] longArray = new byte[Bytes.SIZEOF_LONG];
+      IOUtils.readFully(this.in, longArray);
+      long timestamp = Bytes.toLong(longArray);
+      byte type = (byte) this.in.read();
+      byte[] value = readByteArray(in);
+      byte[] tags = readByteArray(in);
+      // Read memstore version
+      byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
+      IOUtils.readFully(this.in, memstoreTSArray);
+      long memstoreTS = Bytes.toLong(memstoreTSArray);
+      return CellUtil.createCell(row, family, qualifier, timestamp, type, value, tags, memstoreTS);
+    }
+
+    /**
+     * @return Byte array read from the stream.
+     * @throws IOException
+     */
+    private byte[] readByteArray(final InputStream in) throws IOException {
+      byte[] intArray = new byte[Bytes.SIZEOF_INT];
+      IOUtils.readFully(in, intArray);
+      int length = Bytes.toInt(intArray);
+      byte[] bytes = new byte[length];
+      IOUtils.readFully(in, bytes);
+      return bytes;
+    }
+  }
+
+  @Override
+  public Decoder getDecoder(InputStream is) {
+    return new CellDecoder(is);
+  }
+
+  @Override
+  public Encoder getEncoder(OutputStream os) {
+    return new CellEncoder(os);
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java Wed Jan 22 07:19:45 2014
@@ -55,7 +55,8 @@ public class KeyValueCodec implements Co
       checkFlushed();
       // This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
       // make expensive copy.
-      KeyValue.oswrite((KeyValue)KeyValueUtil.ensureKeyValue(cell), this.out);
+      // Do not write tags over RPC
+      KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, false);
     }
   }
 

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+
+/**
+ * Codec that does KeyValue version 1 serialization with serializing tags also.
+ *
+ * <p>
+ * Encodes by casting Cell to KeyValue and writing out the backing array with a length prefix. This
+ * is how KVs were serialized in Puts, Deletes and Results pre-0.96. Its what would happen if you
+ * called the Writable#write KeyValue implementation. This encoder will fail if the passed Cell is
+ * not an old-school pre-0.96 KeyValue. Does not copy bytes writing. It just writes them direct to
+ * the passed stream.
+ *
+ * <p>
+ * If you wrote two KeyValues to this encoder, it would look like this in the stream:
+ *
+ * <pre>
+ * length-of-KeyValue1 // A java int with the length of KeyValue1 backing array
+ * KeyValue1 backing array filled with a KeyValue serialized in its particular format
+ * length-of-KeyValue2
+ * KeyValue2 backing array
+ * </pre>
+ *
+ * Note: The only difference of this with KeyValueCodec is the latter ignores tags in KeyValues.
+ * <b>Use this Codec only at server side.</b>
+ */
+@InterfaceAudience.Private
+public class KeyValueCodecWithTags implements Codec {
+  public static class KeyValueEncoder extends BaseEncoder {
+    public KeyValueEncoder(final OutputStream out) {
+      super(out);
+    }
+
+    @Override
+    public void write(Cell cell) throws IOException {
+      checkFlushed();
+      // This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
+      // make expensive copy.
+      // Write tags
+      KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, true);
+    }
+  }
+
+  public static class KeyValueDecoder extends BaseDecoder {
+    public KeyValueDecoder(final InputStream in) {
+      super(in);
+    }
+
+    protected Cell parseCell() throws IOException {
+      return KeyValue.iscreate(in);
+    }
+  }
+
+  /**
+   * Implementation depends on {@link InputStream#available()}
+   */
+  @Override
+  public Decoder getDecoder(final InputStream is) {
+    return new KeyValueDecoder(is);
+  }
+
+  @Override
+  public Encoder getEncoder(OutputStream os) {
+    return new KeyValueEncoder(os);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestCellCodecWithTags {
+
+  @Test
+  public void testCellWithTag() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CountingOutputStream cos = new CountingOutputStream(baos);
+    DataOutputStream dos = new DataOutputStream(cos);
+    Codec codec = new CellCodecWithTags();
+    Codec.Encoder encoder = codec.getEncoder(dos);
+    final Cell cell1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
+            new Tag((byte) 1, Bytes.toBytes("teststring1")),
+            new Tag((byte) 2, Bytes.toBytes("teststring2")) });
+    final Cell cell2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
+            Bytes.toBytes("teststring3")), });
+    final Cell cell3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
+            new Tag((byte) 2, Bytes.toBytes("teststring4")),
+            new Tag((byte) 2, Bytes.toBytes("teststring5")),
+            new Tag((byte) 1, Bytes.toBytes("teststring6")) });
+
+    encoder.write(cell1);
+    encoder.write(cell2);
+    encoder.write(cell3);
+    encoder.flush();
+    dos.close();
+    long offset = cos.getCount();
+    CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    DataInputStream dis = new DataInputStream(cis);
+    Codec.Decoder decoder = codec.getDecoder(dis);
+    assertTrue(decoder.advance());
+    Cell c = decoder.current();
+    assertTrue(CellComparator.equals(c, cell1));
+    List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(2, tags.size());
+    Tag tag = tags.get(0);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
+    tag = tags.get(1);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, cell2));
+    tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(1, tags.size());
+    tag = tags.get(0);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, cell3));
+    tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(3, tags.size());
+    tag = tags.get(0);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
+    tag = tags.get(1);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
+    tag = tags.get(2);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
+    assertFalse(decoder.advance());
+    dis.close();
+    assertEquals(offset, cis.getCount());
+  }
+}
\ No newline at end of file

Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestKeyValueCodecWithTags {
+
+  @Test
+  public void testKeyValueWithTag() throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    CountingOutputStream cos = new CountingOutputStream(baos);
+    DataOutputStream dos = new DataOutputStream(cos);
+    Codec codec = new KeyValueCodecWithTags();
+    Codec.Encoder encoder = codec.getEncoder(dos);
+    final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
+            new Tag((byte) 1, Bytes.toBytes("teststring1")),
+            new Tag((byte) 2, Bytes.toBytes("teststring2")) });
+    final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
+            Bytes.toBytes("teststring3")), });
+    final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
+        HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
+            new Tag((byte) 2, Bytes.toBytes("teststring4")),
+            new Tag((byte) 2, Bytes.toBytes("teststring5")),
+            new Tag((byte) 1, Bytes.toBytes("teststring6")) });
+
+    encoder.write(kv1);
+    encoder.write(kv2);
+    encoder.write(kv3);
+    encoder.flush();
+    dos.close();
+    long offset = cos.getCount();
+    CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    DataInputStream dis = new DataInputStream(cis);
+    Codec.Decoder decoder = codec.getDecoder(dis);
+    assertTrue(decoder.advance());
+    Cell c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv1));
+    List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(2, tags.size());
+    Tag tag = tags.get(0);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
+    tag = tags.get(1);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv2));
+    tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(1, tags.size());
+    tag = tags.get(0);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
+    assertTrue(decoder.advance());
+    c = decoder.current();
+    assertTrue(CellComparator.equals(c, kv3));
+    tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+    assertEquals(3, tags.size());
+    tag = tags.get(0);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
+    tag = tags.get(1);
+    assertEquals(2, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
+    tag = tags.get(2);
+    assertEquals(1, tag.getType());
+    assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
+    assertFalse(decoder.advance());
+    dis.close();
+    assertEquals(offset, cis.getCount());
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java Wed Jan 22 07:19:45 2014
@@ -293,14 +293,16 @@ public class WALCellCodec implements Cod
     }
   }
 
-  public class EnsureKvEncoder extends KeyValueCodec.KeyValueEncoder {
+  public class EnsureKvEncoder extends BaseEncoder {
     public EnsureKvEncoder(OutputStream out) {
       super(out);
     }
     @Override
     public void write(Cell cell) throws IOException {
       if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
-      super.write(cell);
+      checkFlushed();
+      // Make sure to write tags into WAL
+      KeyValue.oswrite((KeyValue) cell, this.out, true);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Jan 22 07:19:45 2014
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -98,6 +99,10 @@ public class ReplicationSink {
         this.conf.getInt("replication.sink.client.retries.number", 4));
     this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         this.conf.getInt("replication.sink.client.ops.timeout", 10000));
+    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+    if (StringUtils.isNotEmpty(replicationCodec)) {
+      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+    }
    }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Jan 22 07:19:45 2014
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -38,6 +39,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
@@ -144,7 +147,8 @@ public class ReplicationSource extends T
       final ReplicationPeers replicationPeers, final Stoppable stopper,
       final String peerClusterZnode, final UUID clusterId) throws IOException {
     this.stopper = stopper;
-    this.conf = conf;
+    this.conf = HBaseConfiguration.create(conf);
+    decorateConf();
     this.replicationQueueSizeCapacity =
         this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
     this.replicationQueueNbCapacity =
@@ -154,12 +158,12 @@ public class ReplicationSource extends T
         maxRetriesMultiplier * maxRetriesMultiplier);
     this.queue =
         new PriorityBlockingQueue<Path>(
-            conf.getInt("hbase.regionserver.maxlogs", 32),
+            this.conf.getInt("hbase.regionserver.maxlogs", 32),
             new LogsComparator());
     // TODO: This connection is replication specific or we should make it particular to
     // replication and make replication specific settings such as compression or codec to use
     // passing Cells.
-    this.conn = HConnectionManager.getConnection(conf);
+    this.conn = HConnectionManager.getConnection(this.conf);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
     this.manager = manager;
@@ -174,10 +178,16 @@ public class ReplicationSource extends T
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
-    this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
   }
 
+  private void decorateConf() {
+    String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+    if (StringUtils.isNotEmpty(replicationCodec)) {
+      this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+    }
+  }
 
   @Override
   public void enqueueLog(Path log) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Wed Jan 22 07:19:45 2014
@@ -1180,7 +1180,9 @@ public class PerformanceEvaluation exten
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+            value, tags);
+        put.add(kv);
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
@@ -1239,7 +1241,8 @@ public class PerformanceEvaluation exten
 
     @Override
     void testRow(final int i) throws IOException {
-      Put put = new Put(format(i));
+      byte[] row = format(i);
+      Put put = new Put(row);
       byte[] value = generateData(this.rand, VALUE_LENGTH);
       if (useTags) {
         byte[] tag = generateData(this.rand, TAG_LENGTH);
@@ -1248,7 +1251,9 @@ public class PerformanceEvaluation exten
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+            value, tags);
+        put.add(kv);
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Wed Jan 22 07:19:45 2014
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.Tag;
@@ -148,11 +149,13 @@ public class TestEncodedSeekers {
         Put put = new Put(key);
         byte[] col = Bytes.toBytes(String.valueOf(j));
         byte[] value = dataGenerator.generateRandomSizeValue(key, col);
-        put.add(CF_BYTES, col, value);
-        if(includeTags) {
+        if (includeTags) {
           Tag[] tag = new Tag[1];
-          tag[0] = new Tag((byte)1, "Visibility");
-          put.add(CF_BYTES, col, value, tag);
+          tag[0] = new Tag((byte) 1, "Visibility");
+          KeyValue kv = new KeyValue(key, CF_BYTES, col, HConstants.LATEST_TIMESTAMP, value, tag);
+          put.add(kv);
+        } else {
+          put.add(CF_BYTES, col, value);
         }
         if(VERBOSE){
           KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Wed Jan 22 07:19:45 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.Tag;
@@ -389,7 +390,9 @@ public class TestCacheOnWrite {
               Tag t = new Tag((byte) 1, "visibility");
               Tag[] tags = new Tag[1];
               tags[0] = t;
-              p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr), tags);
+              KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
+                  HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
+              p.add(kv);
             } else {
               p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
             }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java Wed Jan 22 07:19:45 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Du
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -300,10 +301,8 @@ public class TestTags {
       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
       Put put = new Put(row);
       byte[] value = Bytes.toBytes("value");
-      Tag[] tags = new Tag[1];
-      tags[0] = new Tag((byte) 1, "ram");
-      put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value, tags);
-      // put.setAttribute("visibility", Bytes.toBytes("myTag"));
+      put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
+      put.setAttribute("visibility", Bytes.toBytes("ram"));
       table.put(put);
       Put put1 = new Put(row1);
       byte[] value1 = Bytes.toBytes("1000dfsdf");
@@ -336,8 +335,8 @@ public class TestTags {
       table.put(put2);
       put2 = new Put(rowe);
       value2 = Bytes.toBytes("1000dfsddfdf");
-      put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2, tags);
-      // put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
+      put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
+      put.setAttribute("visibility", Bytes.toBytes("ram"));
       table.put(put2);
       admin.flush(tableName.getName());
       regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
@@ -418,90 +417,116 @@ public class TestTags {
       table = new HTable(TEST_UTIL.getConfiguration(), tableName);
       Put put = new Put(row1);
       byte[] v = Bytes.toBytes(2L);
-      put.add(f, q, v, new Tag[] { new Tag((byte) 1, "tag1") });
+      put.add(f, q, v);
+      put.setAttribute("visibility", Bytes.toBytes("tag1"));
       table.put(put);
       Increment increment = new Increment(row1);
       increment.addColumn(f, q, 1L);
       table.increment(increment);
+      TestCoprocessorForTags.checkTagPresence = true;
       ResultScanner scanner = table.getScanner(new Scan());
       Result result = scanner.next();
       KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      List<Tag> tags = kv.getTags();
+      List<Tag> tags = TestCoprocessorForTags.tags;
       assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(1, tags.size());
       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
+
       increment = new Increment(row1);
-      increment.add(new KeyValue(row1, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+      increment.add(new KeyValue(row1, f, q, 1234L, v));
+      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
       table.increment(increment);
+      TestCoprocessorForTags.checkTagPresence = true;
       scanner = table.getScanner(new Scan());
       result = scanner.next();
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      tags = kv.getTags();
+      tags = TestCoprocessorForTags.tags;
       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(2, tags.size());
       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
       assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
 
       put = new Put(row2);
       v = Bytes.toBytes(2L);
       put.add(f, q, v);
       table.put(put);
       increment = new Increment(row2);
-      increment.add(new KeyValue(row2, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+      increment.add(new KeyValue(row2, f, q, 1234L, v));
+      increment.setAttribute("visibility", Bytes.toBytes("tag2"));
       table.increment(increment);
       Scan scan = new Scan();
       scan.setStartRow(row2);
+      TestCoprocessorForTags.checkTagPresence = true;
       scanner = table.getScanner(scan);
       result = scanner.next();
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      tags = kv.getTags();
+      tags = TestCoprocessorForTags.tags;
       assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(1, tags.size());
       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
 
       // Test Append
       byte[] row3 = Bytes.toBytes("r3");
       put = new Put(row3);
-      put.add(f, q, Bytes.toBytes("a"), new Tag[] { new Tag((byte) 1, "tag1") });
+      put.add(f, q, Bytes.toBytes("a"));
+      put.setAttribute("visibility", Bytes.toBytes("tag1"));
       table.put(put);
       Append append = new Append(row3);
       append.add(f, q, Bytes.toBytes("b"));
       table.append(append);
       scan = new Scan();
       scan.setStartRow(row3);
+      TestCoprocessorForTags.checkTagPresence = true;
       scanner = table.getScanner(scan);
       result = scanner.next();
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      tags = kv.getTags();
+      tags = TestCoprocessorForTags.tags;
       assertEquals(1, tags.size());
       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
+
       append = new Append(row3);
-      append.add(new KeyValue(row3, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+      append.add(new KeyValue(row3, f, q, 1234L, v));
+      append.setAttribute("visibility", Bytes.toBytes("tag2"));
       table.append(append);
+      TestCoprocessorForTags.checkTagPresence = true;
       scanner = table.getScanner(scan);
       result = scanner.next();
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      tags = kv.getTags();
+      tags = TestCoprocessorForTags.tags;
       assertEquals(2, tags.size());
       assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
       assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
 
       byte[] row4 = Bytes.toBytes("r4");
       put = new Put(row4);
       put.add(f, q, Bytes.toBytes("a"));
       table.put(put);
       append = new Append(row4);
-      append.add(new KeyValue(row4, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+      append.add(new KeyValue(row4, f, q, 1234L, v));
+      append.setAttribute("visibility", Bytes.toBytes("tag2"));
       table.append(append);
       scan = new Scan();
       scan.setStartRow(row4);
+      TestCoprocessorForTags.checkTagPresence = true;
       scanner = table.getScanner(scan);
       result = scanner.next();
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
-      tags = kv.getTags();
+      tags = TestCoprocessorForTags.tags;
       assertEquals(1, tags.size());
       assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
     } finally {
+      TestCoprocessorForTags.checkTagPresence = false;
+      TestCoprocessorForTags.tags = null;
       if (table != null) {
         table.close();
       }
@@ -543,14 +568,22 @@ public class TestTags {
   }
 
   public static class TestCoprocessorForTags extends BaseRegionObserver {
+
+    public static boolean checkTagPresence = false;
+    public static List<Tag> tags = null;
+
     @Override
     public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
         final WALEdit edit, final Durability durability) throws IOException {
-      byte[] attribute = put.getAttribute("visibility");
+      updateMutationAddingTags(put);
+    }
+
+    private void updateMutationAddingTags(final Mutation m) {
+      byte[] attribute = m.getAttribute("visibility");
       byte[] cf = null;
       List<Cell> updatedCells = new ArrayList<Cell>();
       if (attribute != null) {
-        for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
+        for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
           for (Cell cell : edits) {
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
             if (cf == null) {
@@ -567,10 +600,41 @@ public class TestTags {
             ((List<Cell>) updatedCells).add(newKV);
           }
         }
-        put.getFamilyCellMap().remove(cf);
+        m.getFamilyCellMap().remove(cf);
         // Update the family map
-        put.getFamilyCellMap().put(cf, updatedCells);
+        m.getFamilyCellMap().put(cf, updatedCells);
+      }
+    }
+
+    @Override
+    public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
+        throws IOException {
+      updateMutationAddingTags(increment);
+      return super.preIncrement(e, increment);
+    }
+
+    @Override
+    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
+        throws IOException {
+      updateMutationAddingTags(append);
+      return super.preAppend(e, append);
+    }
+
+    @Override
+    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
+        InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
+      if (checkTagPresence) {
+        if (results.size() > 0) {
+          // Check tag presence in the 1st cell in 1st Result
+          Result result = results.get(0);
+          CellScanner cellScanner = result.cellScanner();
+          if (cellScanner.advance()) {
+            Cell cell = cellScanner.current();
+            tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+          }
+        }
       }
+      return hasMore;
     }
   }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java Wed Jan 22 07:19:45 2014
@@ -44,7 +44,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Durability;
@@ -1100,7 +1102,9 @@ public class PerformanceEvaluation exten
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+            value, tags);
+        put.add(kv);
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }
@@ -1159,7 +1163,8 @@ public class PerformanceEvaluation exten
 
     @Override
     void testRow(final int i) throws IOException {
-      Put put = new Put(format(i));
+      byte[] row = format(i);
+      Put put = new Put(row);
       byte[] value = generateData(this.rand, ROW_LENGTH);
       if (useTags) {
         byte[] tag = generateData(this.rand, TAG_LENGTH);
@@ -1168,7 +1173,9 @@ public class PerformanceEvaluation exten
           Tag t = new Tag((byte) n, tag);
           tags[n] = t;
         }
-        put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+        KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+            value, tags);
+        put.add(kv);
       } else {
         put.add(FAMILY_NAME, QUALIFIER_NAME, value);
       }