You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/01/22 08:19:45 UTC
svn commit: r1560265 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-common/src/main/java/org/apach...
Author: anoopsamjohn
Date: Wed Jan 22 07:19:45 2014
New Revision: 1560265
URL: http://svn.apache.org/r1560265
Log:
HBASE-10322 Strip tags from KV while sending back to client on reads.
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java
Removed:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecV2.java
hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecV2.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionKey.java Wed Jan 22 07:19:45 2014
@@ -47,7 +47,8 @@ class HConnectionKey {
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.HBASE_META_SCANNER_CACHING,
- HConstants.HBASE_CLIENT_INSTANCE_ID };
+ HConstants.HBASE_CLIENT_INSTANCE_ID,
+ HConstants.RPC_CODEC_CONF_KEY };
private Map<String, String> properties;
private String username;
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java Wed Jan 22 07:19:45 2014
@@ -126,7 +126,7 @@ class MultiServerCallable<R> extends Reg
if (connection == null) return true; // Default is to do cellblocks.
Configuration configuration = connection.getConfiguration();
if (configuration == null) return true;
- String codec = configuration.get("hbase.client.rpc.codec", "");
+ String codec = configuration.get(HConstants.RPC_CODEC_CONF_KEY, "");
return codec != null && codec.length() > 0;
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Wed Jan 22 07:19:45 2014
@@ -147,13 +147,8 @@ public class Put extends Mutation implem
return addImmutable(family, qualifier, this.ts, value);
}
- public Put add(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
- return add(family, qualifier, this.ts, value, tag);
- }
-
/**
- * See {@link #add(byte[], byte[], byte[], Tag[] tag)}. This version expects
- * that the underlying arrays won't change. It's intended
+ * This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
public Put addImmutable(byte[] family, byte [] qualifier, byte [] value, Tag[] tag) {
@@ -197,20 +192,7 @@ public class Put extends Mutation implem
}
/**
- * Forms a keyvalue with tags
- */
- @SuppressWarnings("unchecked")
- public Put add(byte[] family, byte[] qualifier, long ts, byte[] value, Tag[] tag) {
- List<Cell> list = getCellList(family);
- KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
- list.add(kv);
- familyMap.put(CellUtil.cloneFamily(kv), list);
- return this;
- }
-
- /**
- * See {@link #add(byte[], byte[], long, byte[], Tag[] tag)}. This version expects
- * that the underlying arrays won't change. It's intended
+ * This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
@SuppressWarnings("unchecked")
@@ -223,30 +205,7 @@ public class Put extends Mutation implem
}
/**
- * Add the specified column and value, with the specified timestamp as
- * its version to this Put operation.
- * @param family family name
- * @param qualifier column qualifier
- * @param ts version timestamp
- * @param value column value
- * @param tag the tags
- * @return this
- */
- public Put add(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value, Tag[] tag) {
- if (ts < 0) {
- throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
- }
- List<Cell> list = getCellList(family);
- KeyValue kv = createPutKeyValue(family, qualifier, ts, value, tag);
- list.add(kv);
- familyMap.put(CellUtil.cloneFamily(kv), list);
- return this;
- }
-
-
- /**
- * See {@link #add(byte[], ByteBuffer, long, ByteBuffer, Tag[] tag)}. This version expects
- * that the underlying arrays won't change. It's intended
+ * This expects that the underlying arrays won't change. It's intended
* for usage internal HBase to and for advanced client applications.
*/
public Put addImmutable(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value,
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java Wed Jan 22 07:19:45 2014
@@ -1298,9 +1298,9 @@ public class RpcClient {
* @return Codec to use on this client.
*/
Codec getCodec() {
- // For NO CODEC, "hbase.client.rpc.codec" must be the empty string AND
- // "hbase.client.default.rpc.codec" -- because default is to do cell block encoding.
- String className = conf.get("hbase.client.rpc.codec", getDefaultCodec(this.conf));
+ // For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
+ // "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
+ String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
if (className == null || className.length() == 0) return null;
try {
return (Codec)Class.forName(className).newInstance();
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Jan 22 07:19:45 2014
@@ -675,7 +675,10 @@ public final class ProtobufUtil {
"Missing required field: qualifer value");
}
byte[] value = qv.getValue().toByteArray();
- byte[] tags = qv.getTags().toByteArray();
+ byte[] tags = null;
+ if (qv.hasTags()) {
+ tags = qv.getTags().toByteArray();
+ }
append.add(CellUtil.createCell(row, family, qualifier, append.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}
@@ -750,7 +753,10 @@ public final class ProtobufUtil {
throw new DoNotRetryIOException("Missing required field: qualifer value");
}
byte[] value = qv.getValue().toByteArray();
- byte[] tags = qv.getTags().toByteArray();
+ byte[] tags = null;
+ if (qv.hasTags()) {
+ tags = qv.getTags().toByteArray();
+ }
increment.add(CellUtil.createCell(row, family, qualifier, increment.getTimeStamp(),
KeyValue.Type.Put, value, tags));
}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Jan 22 07:19:45 2014
@@ -973,6 +973,12 @@ public final class HConstants {
/** Configuration key for enabling HLog encryption, a boolean */
public static final String ENABLE_WAL_ENCRYPTION = "hbase.regionserver.wal.encryption";
+ /** Configuration key for setting RPC codec class name */
+ public static final String RPC_CODEC_CONF_KEY = "hbase.client.rpc.codec";
+
+ /** Configuration key for setting replication codec class name */
+ public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Wed Jan 22 07:19:45 2014
@@ -2840,7 +2840,9 @@ public class KeyValue implements Cell, H
* @see #create(DataInput) for the inverse function
* @see #write(KeyValue, DataOutput)
*/
- public static long oswrite(final KeyValue kv, final OutputStream out) throws IOException {
+ @Deprecated
+ public static long oswrite(final KeyValue kv, final OutputStream out)
+ throws IOException {
int length = kv.getLength();
// This does same as DataOuput#writeInt (big-endian, etc.)
out.write(Bytes.toBytes(length));
@@ -2849,6 +2851,30 @@ public class KeyValue implements Cell, H
}
/**
+ * Write out a KeyValue in the manner in which we used to when KeyValue was a Writable but do
+ * not require a {@link DataOutput}, just take plain {@link OutputStream}
+ * Named <code>oswrite</code> so does not clash with {@link #write(KeyValue, DataOutput)}
+ * @param kv
+ * @param out
+ * @param withTags
+ * @return Length written on stream
+ * @throws IOException
+ * @see #create(DataInput) for the inverse function
+ * @see #write(KeyValue, DataOutput)
+ */
+ public static long oswrite(final KeyValue kv, final OutputStream out, final boolean withTags)
+ throws IOException {
+ int length = kv.getLength();
+ if (!withTags) {
+ length = kv.getKeyLength() + kv.getValueLength() + KEYVALUE_INFRASTRUCTURE_SIZE;
+ }
+ // This does same as DataOuput#writeInt (big-endian, etc.)
+ out.write(Bytes.toBytes(length));
+ out.write(kv.getBuffer(), kv.getOffset(), length);
+ return length + Bytes.SIZEOF_INT;
+ }
+
+ /**
* Comparator that compares row component only of a KeyValue.
*/
public static class RowOnlyComparator implements Comparator<KeyValue> {
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/CellCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Basic Cell codec that just writes out all the individual elements of a Cell including the tags.
+ * Uses ints delimiting all lengths. Profligate. Needs tune up.
+ * <b>Use this Codec only at server side.</b>
+ */
+@InterfaceAudience.Private
+public class CellCodecWithTags implements Codec {
+ static class CellEncoder extends BaseEncoder {
+ CellEncoder(final OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(Cell cell) throws IOException {
+ checkFlushed();
+ // Row
+ write(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ // Column family
+ write(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
+ // Qualifier
+ write(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
+ // Version
+ this.out.write(Bytes.toBytes(cell.getTimestamp()));
+ // Type
+ this.out.write(cell.getTypeByte());
+ // Value
+ write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ // Tags
+ write(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+ // MvccVersion
+ this.out.write(Bytes.toBytes(cell.getMvccVersion()));
+ }
+
+ /**
+ * Write int length followed by array bytes.
+ *
+ * @param bytes
+ * @param offset
+ * @param length
+ * @throws IOException
+ */
+ private void write(final byte[] bytes, final int offset, final int length) throws IOException {
+ this.out.write(Bytes.toBytes(length));
+ this.out.write(bytes, offset, length);
+ }
+ }
+
+ static class CellDecoder extends BaseDecoder {
+ public CellDecoder(final InputStream in) {
+ super(in);
+ }
+
+ protected Cell parseCell() throws IOException {
+ byte[] row = readByteArray(this.in);
+ byte[] family = readByteArray(in);
+ byte[] qualifier = readByteArray(in);
+ byte[] longArray = new byte[Bytes.SIZEOF_LONG];
+ IOUtils.readFully(this.in, longArray);
+ long timestamp = Bytes.toLong(longArray);
+ byte type = (byte) this.in.read();
+ byte[] value = readByteArray(in);
+ byte[] tags = readByteArray(in);
+ // Read memstore version
+ byte[] memstoreTSArray = new byte[Bytes.SIZEOF_LONG];
+ IOUtils.readFully(this.in, memstoreTSArray);
+ long memstoreTS = Bytes.toLong(memstoreTSArray);
+ return CellUtil.createCell(row, family, qualifier, timestamp, type, value, tags, memstoreTS);
+ }
+
+ /**
+ * @return Byte array read from the stream.
+ * @throws IOException
+ */
+ private byte[] readByteArray(final InputStream in) throws IOException {
+ byte[] intArray = new byte[Bytes.SIZEOF_INT];
+ IOUtils.readFully(in, intArray);
+ int length = Bytes.toInt(intArray);
+ byte[] bytes = new byte[length];
+ IOUtils.readFully(in, bytes);
+ return bytes;
+ }
+ }
+
+ @Override
+ public Decoder getDecoder(InputStream is) {
+ return new CellDecoder(is);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ return new CellEncoder(os);
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodec.java Wed Jan 22 07:19:45 2014
@@ -55,7 +55,8 @@ public class KeyValueCodec implements Co
checkFlushed();
// This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
// make expensive copy.
- KeyValue.oswrite((KeyValue)KeyValueUtil.ensureKeyValue(cell), this.out);
+ // Do not write tags over RPC
+ KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, false);
}
}
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/codec/KeyValueCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+
+/**
+ * Codec that does KeyValue version 1 serialization with serializing tags also.
+ *
+ * <p>
+ * Encodes by casting Cell to KeyValue and writing out the backing array with a length prefix. This
+ * is how KVs were serialized in Puts, Deletes and Results pre-0.96. Its what would happen if you
+ * called the Writable#write KeyValue implementation. This encoder will fail if the passed Cell is
+ * not an old-school pre-0.96 KeyValue. Does not copy bytes writing. It just writes them direct to
+ * the passed stream.
+ *
+ * <p>
+ * If you wrote two KeyValues to this encoder, it would look like this in the stream:
+ *
+ * <pre>
+ * length-of-KeyValue1 // A java int with the length of KeyValue1 backing array
+ * KeyValue1 backing array filled with a KeyValue serialized in its particular format
+ * length-of-KeyValue2
+ * KeyValue2 backing array
+ * </pre>
+ *
+ * Note: The only difference of this with KeyValueCodec is the latter ignores tags in KeyValues.
+ * <b>Use this Codec only at server side.</b>
+ */
+@InterfaceAudience.Private
+public class KeyValueCodecWithTags implements Codec {
+ public static class KeyValueEncoder extends BaseEncoder {
+ public KeyValueEncoder(final OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void write(Cell cell) throws IOException {
+ checkFlushed();
+ // This is crass and will not work when KV changes. Also if passed a non-kv Cell, it will
+ // make expensive copy.
+ // Write tags
+ KeyValue.oswrite((KeyValue) KeyValueUtil.ensureKeyValue(cell), this.out, true);
+ }
+ }
+
+ public static class KeyValueDecoder extends BaseDecoder {
+ public KeyValueDecoder(final InputStream in) {
+ super(in);
+ }
+
+ protected Cell parseCell() throws IOException {
+ return KeyValue.iscreate(in);
+ }
+ }
+
+ /**
+ * Implementation depends on {@link InputStream#available()}
+ */
+ @Override
+ public Decoder getDecoder(final InputStream is) {
+ return new KeyValueDecoder(is);
+ }
+
+ @Override
+ public Encoder getEncoder(OutputStream os) {
+ return new KeyValueEncoder(os);
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestCellCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestCellCodecWithTags {
+
+ @Test
+ public void testCellWithTag() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CountingOutputStream cos = new CountingOutputStream(baos);
+ DataOutputStream dos = new DataOutputStream(cos);
+ Codec codec = new CellCodecWithTags();
+ Codec.Encoder encoder = codec.getEncoder(dos);
+ final Cell cell1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
+ new Tag((byte) 1, Bytes.toBytes("teststring1")),
+ new Tag((byte) 2, Bytes.toBytes("teststring2")) });
+ final Cell cell2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
+ Bytes.toBytes("teststring3")), });
+ final Cell cell3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
+ new Tag((byte) 2, Bytes.toBytes("teststring4")),
+ new Tag((byte) 2, Bytes.toBytes("teststring5")),
+ new Tag((byte) 1, Bytes.toBytes("teststring6")) });
+
+ encoder.write(cell1);
+ encoder.write(cell2);
+ encoder.write(cell3);
+ encoder.flush();
+ dos.close();
+ long offset = cos.getCount();
+ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ DataInputStream dis = new DataInputStream(cis);
+ Codec.Decoder decoder = codec.getDecoder(dis);
+ assertTrue(decoder.advance());
+ Cell c = decoder.current();
+ assertTrue(CellComparator.equals(c, cell1));
+ List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(2, tags.size());
+ Tag tag = tags.get(0);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
+ tag = tags.get(1);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, cell2));
+ tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(1, tags.size());
+ tag = tags.get(0);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, cell3));
+ tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(3, tags.size());
+ tag = tags.get(0);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
+ tag = tags.get(1);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
+ tag = tags.get(2);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
+ assertFalse(decoder.advance());
+ dis.close();
+ assertEquals(offset, cis.getCount());
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java?rev=1560265&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java (added)
+++ hbase/trunk/hbase-common/src/test/java/org/apache/hadoop/hbase/codec/TestKeyValueCodecWithTags.java Wed Jan 22 07:19:45 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.io.CountingInputStream;
+import com.google.common.io.CountingOutputStream;
+
+@Category(SmallTests.class)
+public class TestKeyValueCodecWithTags {
+
+ @Test
+ public void testKeyValueWithTag() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ CountingOutputStream cos = new CountingOutputStream(baos);
+ DataOutputStream dos = new DataOutputStream(cos);
+ Codec codec = new KeyValueCodecWithTags();
+ Codec.Encoder encoder = codec.getEncoder(dos);
+ final KeyValue kv1 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("1"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("1"), new Tag[] {
+ new Tag((byte) 1, Bytes.toBytes("teststring1")),
+ new Tag((byte) 2, Bytes.toBytes("teststring2")) });
+ final KeyValue kv2 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("2"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("2"), new Tag[] { new Tag((byte) 1,
+ Bytes.toBytes("teststring3")), });
+ final KeyValue kv3 = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("3"),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes("3"), new Tag[] {
+ new Tag((byte) 2, Bytes.toBytes("teststring4")),
+ new Tag((byte) 2, Bytes.toBytes("teststring5")),
+ new Tag((byte) 1, Bytes.toBytes("teststring6")) });
+
+ encoder.write(kv1);
+ encoder.write(kv2);
+ encoder.write(kv3);
+ encoder.flush();
+ dos.close();
+ long offset = cos.getCount();
+ CountingInputStream cis = new CountingInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ DataInputStream dis = new DataInputStream(cis);
+ Codec.Decoder decoder = codec.getDecoder(dis);
+ assertTrue(decoder.advance());
+ Cell c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv1));
+ List<Tag> tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(2, tags.size());
+ Tag tag = tags.get(0);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring1"), tag.getValue()));
+ tag = tags.get(1);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring2"), tag.getValue()));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv2));
+ tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(1, tags.size());
+ tag = tags.get(0);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring3"), tag.getValue()));
+ assertTrue(decoder.advance());
+ c = decoder.current();
+ assertTrue(CellComparator.equals(c, kv3));
+ tags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength());
+ assertEquals(3, tags.size());
+ tag = tags.get(0);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring4"), tag.getValue()));
+ tag = tags.get(1);
+ assertEquals(2, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring5"), tag.getValue()));
+ tag = tags.get(2);
+ assertEquals(1, tag.getType());
+ assertTrue(Bytes.equals(Bytes.toBytes("teststring6"), tag.getValue()));
+ assertFalse(decoder.advance());
+ dis.close();
+ assertEquals(offset, cis.getCount());
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java Wed Jan 22 07:19:45 2014
@@ -293,14 +293,16 @@ public class WALCellCodec implements Cod
}
}
- public class EnsureKvEncoder extends KeyValueCodec.KeyValueEncoder {
+ public class EnsureKvEncoder extends BaseEncoder {
public EnsureKvEncoder(OutputStream out) {
super(out);
}
@Override
public void write(Cell cell) throws IOException {
if (!(cell instanceof KeyValue)) throw new IOException("Cannot write non-KV cells to WAL");
- super.write(cell);
+ checkFlushed();
+ // Make sure to write tags into WAL
+ KeyValue.oswrite((KeyValue) cell, this.out, true);
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Wed Jan 22 07:19:45 2014
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -98,6 +99,10 @@ public class ReplicationSink {
this.conf.getInt("replication.sink.client.retries.number", 4));
this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
this.conf.getInt("replication.sink.client.ops.timeout", 10000));
+ String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+ if (StringUtils.isNotEmpty(replicationCodec)) {
+ this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+ }
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Jan 22 07:19:45 2014
@@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -38,6 +39,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
@@ -144,7 +147,8 @@ public class ReplicationSource extends T
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId) throws IOException {
this.stopper = stopper;
- this.conf = conf;
+ this.conf = HBaseConfiguration.create(conf);
+ decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
@@ -154,12 +158,12 @@ public class ReplicationSource extends T
maxRetriesMultiplier * maxRetriesMultiplier);
this.queue =
new PriorityBlockingQueue<Path>(
- conf.getInt("hbase.regionserver.maxlogs", 32),
+ this.conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
// TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use
// passing Cells.
- this.conn = HConnectionManager.getConnection(conf);
+ this.conn = HConnectionManager.getConnection(this.conf);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
@@ -174,10 +178,16 @@ public class ReplicationSource extends T
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
- this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, this.conf);
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
}
+ private void decorateConf() {
+ String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
+ if (StringUtils.isNotEmpty(replicationCodec)) {
+ this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
+ }
+ }
@Override
public void enqueueLog(Path log) {
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java Wed Jan 22 07:19:45 2014
@@ -1180,7 +1180,9 @@ public class PerformanceEvaluation exten
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
- put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
@@ -1239,7 +1241,8 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
- Put put = new Put(format(i));
+ byte[] row = format(i);
+ Put put = new Put(row);
byte[] value = generateData(this.rand, VALUE_LENGTH);
if (useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
@@ -1248,7 +1251,9 @@ public class PerformanceEvaluation exten
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
- put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java Wed Jan 22 07:19:45 2014
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Tag;
@@ -148,11 +149,13 @@ public class TestEncodedSeekers {
Put put = new Put(key);
byte[] col = Bytes.toBytes(String.valueOf(j));
byte[] value = dataGenerator.generateRandomSizeValue(key, col);
- put.add(CF_BYTES, col, value);
- if(includeTags) {
+ if (includeTags) {
Tag[] tag = new Tag[1];
- tag[0] = new Tag((byte)1, "Visibility");
- put.add(CF_BYTES, col, value, tag);
+ tag[0] = new Tag((byte) 1, "Visibility");
+ KeyValue kv = new KeyValue(key, CF_BYTES, col, HConstants.LATEST_TIMESTAMP, value, tag);
+ put.add(kv);
+ } else {
+ put.add(CF_BYTES, col, value);
}
if(VERBOSE){
KeyValue kvPut = new KeyValue(key, CF_BYTES, col, value);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Wed Jan 22 07:19:45 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Tag;
@@ -389,7 +390,9 @@ public class TestCacheOnWrite {
Tag t = new Tag((byte) 1, "visibility");
Tag[] tags = new Tag[1];
tags[0] = t;
- p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr), tags);
+ KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
+ HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
+ p.add(kv);
} else {
p.add(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java Wed Jan 22 07:19:45 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Du
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -300,10 +301,8 @@ public class TestTags {
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
- Tag[] tags = new Tag[1];
- tags[0] = new Tag((byte) 1, "ram");
- put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value, tags);
- // put.setAttribute("visibility", Bytes.toBytes("myTag"));
+ put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value);
+ put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
@@ -336,8 +335,8 @@ public class TestTags {
table.put(put2);
put2 = new Put(rowe);
value2 = Bytes.toBytes("1000dfsddfdf");
- put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2, tags);
- // put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
+ put2.add(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
+ put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put2);
admin.flush(tableName.getName());
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName.getName());
@@ -418,90 +417,116 @@ public class TestTags {
table = new HTable(TEST_UTIL.getConfiguration(), tableName);
Put put = new Put(row1);
byte[] v = Bytes.toBytes(2L);
- put.add(f, q, v, new Tag[] { new Tag((byte) 1, "tag1") });
+ put.add(f, q, v);
+ put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Increment increment = new Increment(row1);
increment.addColumn(f, q, 1L);
table.increment(increment);
+ TestCoprocessorForTags.checkTagPresence = true;
ResultScanner scanner = table.getScanner(new Scan());
Result result = scanner.next();
KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- List<Tag> tags = kv.getTags();
+ List<Tag> tags = TestCoprocessorForTags.tags;
assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
+
increment = new Increment(row1);
- increment.add(new KeyValue(row1, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+ increment.add(new KeyValue(row1, f, q, 1234L, v));
+ increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
+ TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan());
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- tags = kv.getTags();
+ tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
put = new Put(row2);
v = Bytes.toBytes(2L);
put.add(f, q, v);
table.put(put);
increment = new Increment(row2);
- increment.add(new KeyValue(row2, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+ increment.add(new KeyValue(row2, f, q, 1234L, v));
+ increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
Scan scan = new Scan();
scan.setStartRow(row2);
+ TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- tags = kv.getTags();
+ tags = TestCoprocessorForTags.tags;
assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
// Test Append
byte[] row3 = Bytes.toBytes("r3");
put = new Put(row3);
- put.add(f, q, Bytes.toBytes("a"), new Tag[] { new Tag((byte) 1, "tag1") });
+ put.add(f, q, Bytes.toBytes("a"));
+ put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Append append = new Append(row3);
append.add(f, q, Bytes.toBytes("b"));
table.append(append);
scan = new Scan();
scan.setStartRow(row3);
+ TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- tags = kv.getTags();
+ tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
+
append = new Append(row3);
- append.add(new KeyValue(row3, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+ append.add(new KeyValue(row3, f, q, 1234L, v));
+ append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
+ TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- tags = kv.getTags();
+ tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
byte[] row4 = Bytes.toBytes("r4");
put = new Put(row4);
put.add(f, q, Bytes.toBytes("a"));
table.put(put);
append = new Append(row4);
- append.add(new KeyValue(row4, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") }));
+ append.add(new KeyValue(row4, f, q, 1234L, v));
+ append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
scan = new Scan();
scan.setStartRow(row4);
+ TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(scan);
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
- tags = kv.getTags();
+ tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(tags.get(0).getValue()));
} finally {
+ TestCoprocessorForTags.checkTagPresence = false;
+ TestCoprocessorForTags.tags = null;
if (table != null) {
table.close();
}
@@ -543,14 +568,22 @@ public class TestTags {
}
public static class TestCoprocessorForTags extends BaseRegionObserver {
+
+ public static boolean checkTagPresence = false;
+ public static List<Tag> tags = null;
+
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
- byte[] attribute = put.getAttribute("visibility");
+ updateMutationAddingTags(put);
+ }
+
+ private void updateMutationAddingTags(final Mutation m) {
+ byte[] attribute = m.getAttribute("visibility");
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<Cell>();
if (attribute != null) {
- for (List<? extends Cell> edits : put.getFamilyCellMap().values()) {
+ for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
@@ -567,10 +600,41 @@ public class TestTags {
((List<Cell>) updatedCells).add(newKV);
}
}
- put.getFamilyCellMap().remove(cf);
+ m.getFamilyCellMap().remove(cf);
// Update the family map
- put.getFamilyCellMap().put(cf, updatedCells);
+ m.getFamilyCellMap().put(cf, updatedCells);
+ }
+ }
+
+ @Override
+ public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
+ throws IOException {
+ updateMutationAddingTags(increment);
+ return super.preIncrement(e, increment);
+ }
+
+ @Override
+ public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
+ throws IOException {
+ updateMutationAddingTags(append);
+ return super.preAppend(e, append);
+ }
+
+ @Override
+ public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
+ InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
+ if (checkTagPresence) {
+ if (results.size() > 0) {
+ // Check tag presence in the 1st cell in 1st Result
+ Result result = results.get(0);
+ CellScanner cellScanner = result.cellScanner();
+ if (cellScanner.advance()) {
+ Cell cell = cellScanner.current();
+ tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
+ }
+ }
}
+ return hasMore;
}
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java?rev=1560265&r1=1560264&r2=1560265&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java Wed Jan 22 07:19:45 2014
@@ -44,7 +44,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Durability;
@@ -1100,7 +1102,9 @@ public class PerformanceEvaluation exten
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
- put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}
@@ -1159,7 +1163,8 @@ public class PerformanceEvaluation exten
@Override
void testRow(final int i) throws IOException {
- Put put = new Put(format(i));
+ byte[] row = format(i);
+ Put put = new Put(row);
byte[] value = generateData(this.rand, ROW_LENGTH);
if (useTags) {
byte[] tag = generateData(this.rand, TAG_LENGTH);
@@ -1168,7 +1173,9 @@ public class PerformanceEvaluation exten
Tag t = new Tag((byte) n, tag);
tags[n] = t;
}
- put.add(FAMILY_NAME, QUALIFIER_NAME, value, tags);
+ KeyValue kv = new KeyValue(row, FAMILY_NAME, QUALIFIER_NAME, HConstants.LATEST_TIMESTAMP,
+ value, tags);
+ put.add(kv);
} else {
put.add(FAMILY_NAME, QUALIFIER_NAME, value);
}