You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/07/02 23:35:31 UTC
svn commit: r1499118 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/java/org/apach...
Author: stack
Date: Tue Jul 2 21:35:30 2013
New Revision: 1499118
URL: http://svn.apache.org/r1499118
Log:
HBASE-8737 [replication] Change replication RPC to use cell blocks
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java
hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java Tue Jul 2 21:35:30 2013
@@ -106,6 +106,23 @@ public class Delete extends Mutation imp
* @param rowArray We make a local copy of this passed in row.
* @param rowOffset
* @param rowLength
+ */
+ public Delete(final byte [] rowArray, final int rowOffset, final int rowLength) {
+ this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Create a Delete operation for the specified row and timestamp.<p>
+ *
+ * If no further operations are done, this will delete all columns in all
+ * families of the specified row with a timestamp less than or equal to the
+ * specified timestamp.<p>
+ *
+ * This timestamp is ONLY used for a delete row operation. If specifying
+ * families or columns, you must specify each timestamp individually.
+ * @param rowArray We make a local copy of this passed in row.
+ * @param rowOffset
+ * @param rowLength
* @param ts maximum version timestamp (only for delete row)
*/
public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) {
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Tue Jul 2 21:35:30 2013
@@ -70,6 +70,17 @@ public class Put extends Mutation implem
* @param rowLength
* @param ts
*/
+ public Put(byte [] rowArray, int rowOffset, int rowLength) {
+ this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP);
+ }
+
+ /**
+ * We make a copy of the passed in row key to keep local.
+ * @param rowArray
+ * @param rowOffset
+ * @param rowLength
+ * @param ts
+ */
public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) {
checkRow(rowArray, rowOffset, rowLength);
this.row = Bytes.copy(rowArray, rowOffset, rowLength);
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Tue Jul 2 21:35:30 2013
@@ -33,10 +33,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
+import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -47,49 +46,61 @@ import com.google.common.base.Preconditi
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
-import com.google.protobuf.TextFormat;
/**
* Utility to help ipc'ing.
*/
class IPCUtil {
public static final Log LOG = LogFactory.getLog(IPCUtil.class);
- private final int cellBlockBuildingInitialBufferSize;
/**
* How much we think the decompressor will expand the original compressed content.
*/
private final int cellBlockDecompressionMultiplier;
+ private final int cellBlockBuildingInitialBufferSize;
private final Configuration conf;
IPCUtil(final Configuration conf) {
super();
this.conf = conf;
- this.cellBlockBuildingInitialBufferSize =
- conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024);
this.cellBlockDecompressionMultiplier =
conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+ // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
+ // #buildCellBlock.
+ this.cellBlockBuildingInitialBufferSize =
+ ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
}
/**
- * Build a cell block using passed in <code>codec</code>
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
* @param codec
* @param compressor
- * @Param cells
- * @return Null or byte buffer filled with passed-in Cells encoded using passed in
- * <code>codec</code>; the returned buffer has been flipped and is ready for
- * reading. Use limit to find total size.
+ * @Param cellScanner
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
+ * flipped and is ready for reading. Use limit to find total size.
* @throws IOException
*/
@SuppressWarnings("resource")
ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
- final CellScanner cells)
+ final CellScanner cellScanner)
throws IOException {
- if (cells == null) return null;
- // TOOD: Reuse buffers?
- // Presizing doesn't work because can't tell what size will be when serialized.
- // BBOS will resize itself.
- ByteBufferOutputStream baos =
- new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize);
+ if (cellScanner == null) return null;
+ int bufferSize = this.cellBlockBuildingInitialBufferSize;
+ if (cellScanner instanceof HeapSize) {
+ long longSize = ((HeapSize)cellScanner).heapSize();
+ // Just make sure we don't have a size bigger than an int.
+ if (longSize > Integer.MAX_VALUE) {
+ throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+ }
+ bufferSize = ClassSize.align((int)longSize);
+ } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
+ // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
+ // total size before creating the buffer. It costs somw small percentage. If we are usually
+ // within the estimated buffer size, then the cost is not worth it. If we are often well
+ // outside the guesstimated buffer size, the processing can be done in half the time if we
+ // go w/ the estimated size rather than let the buffer resize.
+ ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
OutputStream os = baos;
Compressor poolCompressor = null;
try {
@@ -99,8 +110,8 @@ class IPCUtil {
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
- while (cells.advance()) {
- encoder.write(cells.current());
+ while (cellScanner.advance()) {
+ encoder.write(cellScanner.current());
}
encoder.flush();
} finally {
@@ -108,9 +119,9 @@ class IPCUtil {
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
}
if (LOG.isTraceEnabled()) {
- if (this.cellBlockBuildingInitialBufferSize < baos.size()) {
- LOG.trace("Buffer grew from " + this.cellBlockBuildingInitialBufferSize +
- " to " + baos.size());
+ if (bufferSize < baos.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
}
return baos.getByteBuffer();
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Tue Jul 2 21:35:30 2013
@@ -1290,4 +1290,4 @@ public final class RequestConverter {
}
return builder.build();
}
-}
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java Tue Jul 2 21:35:30 2013
@@ -23,21 +23,28 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import org.apache.commons.lang.time.StopWatch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mortbay.log.Log;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
@Category(SmallTests.class)
public class TestIPCUtil {
@@ -49,33 +56,137 @@ public class TestIPCUtil {
@Test
public void testBuildCellBlock() throws IOException {
- doBuildCellBlockUndoCellBlock(new KeyValueCodec(), null);
- doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new DefaultCodec());
- doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new GzipCodec());
+ doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
+ doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec());
+ doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec());
}
- void doBuildCellBlockUndoCellBlock(final Codec codec, final CompressionCodec compressor)
+ static void doBuildCellBlockUndoCellBlock(final IPCUtil util,
+ final Codec codec, final CompressionCodec compressor)
throws IOException {
- final int count = 10;
- Cell [] cells = getCells(count);
- ByteBuffer bb = this.util.buildCellBlock(codec, compressor,
- CellUtil.createCellScanner(Arrays.asList(cells).iterator()));
- CellScanner scanner =
- this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+ doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
+ }
+
+ static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec,
+ final CompressionCodec compressor, final int count, final int size, final boolean sized)
+ throws IOException {
+ Cell [] cells = getCells(count, size);
+ CellScanner cellScanner = sized? getSizedCellScanner(cells):
+ CellUtil.createCellScanner(Arrays.asList(cells).iterator());
+ ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
+ cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
int i = 0;
- while (scanner.advance()) {
+ while (cellScanner.advance()) {
i++;
}
assertEquals(count, i);
}
+ static CellScanner getSizedCellScanner(final Cell [] cells) {
+ int size = -1;
+ for (Cell cell: cells) {
+ size += CellUtil.estimatedSizeOf(cell);
+ }
+ final int totalSize = ClassSize.align(size);
+ final CellScanner cellScanner = CellUtil.createCellScanner(cells);
+ return new SizedCellScanner() {
+ @Override
+ public long heapSize() {
+ return totalSize;
+ }
+
+ @Override
+ public Cell current() {
+ return cellScanner.current();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return cellScanner.advance();
+ }
+ };
+ }
+
static Cell [] getCells(final int howMany) {
+ return getCells(howMany, 1024);
+ }
+
+ static Cell [] getCells(final int howMany, final int valueSize) {
Cell [] cells = new Cell[howMany];
+ byte [] value = new byte[valueSize];
for (int i = 0; i < howMany; i++) {
byte [] index = Bytes.toBytes(i);
- KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+ KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
cells[i] = kv;
}
return cells;
}
+
+ private static final String COUNT = "--count=";
+ private static final String SIZE = "--size=";
+
+ /**
+ * Prints usage and then exits w/ passed <code>errCode</code>
+ * @param errCode
+ */
+ private static void usage(final int errCode) {
+ System.out.println("Usage: IPCUtil [options]");
+ System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
+ System.out.println(" --count Count of Cells");
+ System.out.println(" --size Size of Cell values");
+ System.out.println("Example: IPCUtil --count=1024 --size=1024");
+ System.exit(errCode);
+ }
+
+ private static void timerTests(final IPCUtil util, final int count, final int size,
+ final Codec codec, final CompressionCodec compressor)
+ throws IOException {
+ final int cycles = 1000;
+ StopWatch timer = new StopWatch();
+ timer.start();
+ for (int i = 0; i < cycles; i++) {
+ timerTest(util, timer, count, size, codec, compressor, false);
+ }
+ timer.stop();
+ Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false +
+ ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+ timer.reset();
+ timer.start();
+ for (int i = 0; i < cycles; i++) {
+ timerTest(util, timer, count, size, codec, compressor, true);
+ }
+ timer.stop();
+ Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true +
+ ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+ }
+
+ private static void timerTest(final IPCUtil util, final StopWatch timer, final int count,
+ final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
+ throws IOException {
+ doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
+ }
+
+ /**
+ * For running a few tests of methods herein.
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ int count = 1024;
+ int size = 10240;
+ for (String arg: args) {
+ if (arg.startsWith(COUNT)) {
+ count = Integer.parseInt(arg.replace(COUNT, ""));
+ } else if (arg.startsWith(SIZE)) {
+ size = Integer.parseInt(arg.replace(SIZE, ""));
+ } else {
+ usage(1);
+ }
+ }
+ IPCUtil util = new IPCUtil(HBaseConfiguration.create());
+ ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
+ timerTests(util, count, size, new KeyValueCodec(), null);
+ timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
+ timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
+ }
}
\ No newline at end of file
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java Tue Jul 2 21:35:30 2013
@@ -28,6 +28,7 @@ import java.util.NavigableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* Utility methods helpful slinging {@link Cell} instances.
@@ -242,4 +243,44 @@ public final class CellUtil {
}
};
}
-}
+
+ /**
+ * @param left
+ * @param right
+ * @return True if the rows in <code>left</code> and <code>right</code> Cells match
+ */
+ public static boolean matchingRow(final Cell left, final Cell right) {
+ return Bytes.equals(left.getRowArray(), left.getRowOffset(), left.getRowLength(),
+ right.getRowArray(), right.getRowOffset(), right.getRowLength());
+ }
+
+ /**
+ * @return True if a delete type, a {@link KeyValue.Type#Delete} or
+ * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
+ * KeyValue type.
+ */
+ public static boolean isDelete(final Cell cell) {
+ return KeyValue.isDelete(cell.getTypeByte());
+ }
+
+ /**
+ * @param cell
+ * @return Estimate of the <code>cell</code> size in bytes.
+ */
+ public static int estimatedSizeOf(final Cell cell) {
+ // If a KeyValue, we can give a good estimate of size.
+ if (cell instanceof KeyValue) {
+ return ((KeyValue)cell).getLength() + Bytes.SIZEOF_INT;
+ }
+ // TODO: Should we add to Cell a sizeOf? Would it help? Does it make sense if Cell is
+ // prefix encoded or compressed?
+ return cell.getRowLength() + cell.getFamilyLength() +
+ cell.getQualifierLength() +
+ cell.getValueLength() +
+ // Use the KeyValue's infrastructure size presuming that another implementation would have
+ // same basic cost.
+ KeyValue.KEY_INFRASTRUCTURE_SIZE +
+ // Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
+ Bytes.SIZEOF_INT;
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Jul 2 21:35:30 2013
@@ -20,9 +20,7 @@
package org.apache.hadoop.hbase;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
-import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java Tue Jul 2 21:35:30 2013
@@ -46,5 +46,4 @@ public interface HeapSize {
* count of payload and hosting object sizings.
*/
public long heapSize();
-
-}
+}
\ No newline at end of file
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java?rev=1499118&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java Tue Jul 2 21:35:30 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.hbase.CellScanner;
+
+/**
+ * A CellScanner that knows its size in memory in bytes.
+ * Used playing the CellScanner into an in-memory buffer; knowing the size ahead of time saves
+ * on background buffer resizings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface SizedCellScanner extends CellScanner, HeapSize {}
\ No newline at end of file
Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java Tue Jul 2 21:35:30 2013
@@ -9894,6 +9894,10 @@ public final class AdminProtos {
java.util.List<com.google.protobuf.ByteString> getKeyValueBytesList();
int getKeyValueBytesCount();
com.google.protobuf.ByteString getKeyValueBytes(int index);
+
+ // optional int32 associatedCellCount = 3;
+ boolean hasAssociatedCellCount();
+ int getAssociatedCellCount();
}
public static final class WALEntry extends
com.google.protobuf.GeneratedMessage
@@ -9951,9 +9955,20 @@ public final class AdminProtos {
return keyValueBytes_.get(index);
}
+ // optional int32 associatedCellCount = 3;
+ public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3;
+ private int associatedCellCount_;
+ public boolean hasAssociatedCellCount() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ public int getAssociatedCellCount() {
+ return associatedCellCount_;
+ }
+
private void initFields() {
key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance();
keyValueBytes_ = java.util.Collections.emptyList();;
+ associatedCellCount_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -9981,6 +9996,9 @@ public final class AdminProtos {
for (int i = 0; i < keyValueBytes_.size(); i++) {
output.writeBytes(2, keyValueBytes_.get(i));
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeInt32(3, associatedCellCount_);
+ }
getUnknownFields().writeTo(output);
}
@@ -10003,6 +10021,10 @@ public final class AdminProtos {
size += dataSize;
size += 1 * getKeyValueBytesList().size();
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt32Size(3, associatedCellCount_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -10033,6 +10055,11 @@ public final class AdminProtos {
}
result = result && getKeyValueBytesList()
.equals(other.getKeyValueBytesList());
+ result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount());
+ if (hasAssociatedCellCount()) {
+ result = result && (getAssociatedCellCount()
+ == other.getAssociatedCellCount());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -10050,6 +10077,10 @@ public final class AdminProtos {
hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER;
hash = (53 * hash) + getKeyValueBytesList().hashCode();
}
+ if (hasAssociatedCellCount()) {
+ hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER;
+ hash = (53 * hash) + getAssociatedCellCount();
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
@@ -10175,6 +10206,8 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000001);
keyValueBytes_ = java.util.Collections.emptyList();;
bitField0_ = (bitField0_ & ~0x00000002);
+ associatedCellCount_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -10226,6 +10259,10 @@ public final class AdminProtos {
bitField0_ = (bitField0_ & ~0x00000002);
}
result.keyValueBytes_ = keyValueBytes_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.associatedCellCount_ = associatedCellCount_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -10255,6 +10292,9 @@ public final class AdminProtos {
}
onChanged();
}
+ if (other.hasAssociatedCellCount()) {
+ setAssociatedCellCount(other.getAssociatedCellCount());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -10308,6 +10348,11 @@ public final class AdminProtos {
keyValueBytes_.add(input.readBytes());
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ associatedCellCount_ = input.readInt32();
+ break;
+ }
}
}
}
@@ -10455,6 +10500,27 @@ public final class AdminProtos {
return this;
}
+ // optional int32 associatedCellCount = 3;
+ private int associatedCellCount_ ;
+ public boolean hasAssociatedCellCount() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ public int getAssociatedCellCount() {
+ return associatedCellCount_;
+ }
+ public Builder setAssociatedCellCount(int value) {
+ bitField0_ |= 0x00000004;
+ associatedCellCount_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearAssociatedCellCount() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ associatedCellCount_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:WALEntry)
}
@@ -15359,41 +15425,41 @@ public final class AdminProtos {
"gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" +
"ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" +
"er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" +
- "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W",
- "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" +
- "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" +
- "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" +
- "ALWriterRequest\".\n\025RollWALWriterResponse" +
- "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" +
- "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" +
- "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" +
- "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" +
- "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" +
- "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin",
- "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" +
- "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" +
- "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" +
- "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" +
- "ineRegionRequest\032\030.GetOnlineRegionRespon" +
- "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" +
- "penRegionResponse\0228\n\013closeRegion\022\023.Close" +
- "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" +
- "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" +
- "egionResponse\0228\n\013splitRegion\022\023.SplitRegi",
- "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" +
- "ctRegion\022\025.CompactRegionRequest\032\026.Compac" +
- "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" +
- "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" +
- "replicateWALEntry\022\031.ReplicateWALEntryReq" +
- "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" +
- "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" +
- "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" +
- "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" +
- "ServerInfoRequest\032\026.GetServerInfoRespons",
- "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" +
- "opServerResponseBA\n*org.apache.hadoop.hb" +
- "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" +
- "\240\001\001"
+ "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W",
+ "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" +
+ "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" +
+ "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" +
+ "cateWALEntryResponse\"\026\n\024RollWALWriterReq" +
+ "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" +
+ "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" +
+ "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" +
+ "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" +
+ "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" +
+ "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo",
+ "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" +
+ "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" +
+ "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" +
+ "etStoreFileRequest\032\025.GetStoreFileRespons" +
+ "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" +
+ "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" +
+ "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" +
+ "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" +
+ "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" +
+ "\023.FlushRegionRequest\032\024.FlushRegionRespon",
+ "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" +
+ ".SplitRegionResponse\022>\n\rcompactRegion\022\025." +
+ "CompactRegionRequest\032\026.CompactRegionResp" +
+ "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" +
+ "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" +
+ "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" +
+ "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" +
+ "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" +
+ "\025.RollWALWriterRequest\032\026.RollWALWriterRe" +
+ "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR",
+ "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" +
+ "rver\022\022.StopServerRequest\032\023.StopServerRes" +
+ "ponseBA\n*org.apache.hadoop.hbase.protobu" +
+ "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15557,7 +15623,7 @@ public final class AdminProtos {
internal_static_WALEntry_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALEntry_descriptor,
- new java.lang.String[] { "Key", "KeyValueBytes", },
+ new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", },
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class,
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class);
internal_static_ReplicateWALEntryRequest_descriptor =
Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto Tue Jul 2 21:35:30 2013
@@ -161,14 +161,18 @@ message MergeRegionsResponse {
// Protocol buffer version of WAL for replication
message WALEntry {
required WALKey key = 1;
+ // Following may be null if the KVs/Cells are carried along the side in a cellblock (See
+ // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null
+ // and associatedCellCount has count of Cells associated w/ this WALEntry
repeated bytes keyValueBytes = 2;
+ // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock.
+ optional int32 associatedCellCount = 3;
}
/**
* Replicates the given entries. The guarantee is that the given entries
* will be durable on the slave cluster if this method returns without
- * any exception.
- * hbase.replication has to be set to true for this to work.
+ * any exception. hbase.replication has to be set to true for this to work.
*/
message ReplicateWALEntryRequest {
repeated WALEntry entry = 1;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Tue Jul 2 21:35:30 2013
@@ -20,26 +20,32 @@
package org.apache.hadoop.hbase.protobuf;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.UUID;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
public class ReplicationProtbufUtil {
/**
@@ -81,10 +87,11 @@ public class ReplicationProtbufUtil {
*/
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
final HLog.Entry[] entries) throws IOException {
- AdminProtos.ReplicateWALEntryRequest request =
+ Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries);
try {
- admin.replicateWALEntry(null, request);
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
+ admin.replicateWALEntry(controller, p.getFirst());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
@@ -94,10 +101,14 @@ public class ReplicationProtbufUtil {
* Create a new ReplicateWALEntryRequest from a list of HLog entries
*
* @param entries the HLog entries to be replicated
- * @return a ReplicateWALEntryRequest
+ * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
+ * found.
*/
- public static AdminProtos.ReplicateWALEntryRequest
+ public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
+ // Accumulate all the KVs seen in here.
+ List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
+ int size = 0;
WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
AdminProtos.ReplicateWALEntryRequest.Builder builder =
@@ -128,13 +139,55 @@ public class ReplicationProtbufUtil {
keyBuilder.addScopes(scopeBuilder.build());
}
}
- List<KeyValue> keyValues = edit.getKeyValues();
- for (KeyValue value: keyValues) {
- entryBuilder.addKeyValueBytes(ByteString.copyFrom(
- value.getBuffer(), value.getOffset(), value.getLength()));
+ List<KeyValue> kvs = edit.getKeyValues();
+ // Add up the size. It is used later serializing out the kvs.
+ for (KeyValue kv: kvs) {
+ size += kv.getLength();
}
+ // Collect up the kvs
+ allkvs.add(kvs);
+ // Write out how many kvs associated with this entry.
+ entryBuilder.setAssociatedCellCount(kvs.size());
builder.addEntry(entryBuilder.build());
}
- return builder.build();
+ return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
+ getCellScanner(allkvs, size));
+ }
+
+ /**
+ * @param cells
+ * @return <code>cells</code> packaged as a CellScanner
+ */
+ static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
+ return new SizedCellScanner() {
+ private final Iterator<List<? extends Cell>> entries = cells.iterator();
+ private Iterator<? extends Cell> currentIterator = null;
+ private Cell currentCell;
+
+ @Override
+ public Cell current() {
+ return this.currentCell;
+ }
+
+ @Override
+ public boolean advance() {
+ if (this.currentIterator == null) {
+ if (!this.entries.hasNext()) return false;
+ this.currentIterator = this.entries.next().iterator();
+ }
+ if (this.currentIterator.hasNext()) {
+ this.currentCell = this.currentIterator.next();
+ return true;
+ }
+ this.currentCell = null;
+ this.currentIterator = null;
+ return advance();
+ }
+
+ @Override
+ public long heapSize() {
+ return size;
+ }
+ };
}
-}
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul 2 21:35:30 2013
@@ -2219,8 +2219,7 @@ public class HRegionServer implements Cl
conf, server, fs, logDir, oldLogDir);
server.replicationSinkHandler = (ReplicationSinkService)
server.replicationSourceHandler;
- }
- else {
+ } else {
server.replicationSourceHandler = (ReplicationSourceService)
newReplicationInstance(sourceClassname,
conf, server, fs, logDir, oldLogDir);
@@ -3715,15 +3714,14 @@ public class HRegionServer implements Cl
@Override
@QosPriority(priority=HConstants.REPLICATION_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
- final ReplicateWALEntryRequest request) throws ServiceException {
+ final ReplicateWALEntryRequest request)
+ throws ServiceException {
try {
if (replicationSinkHandler != null) {
checkOpen();
requestCount.increment();
- HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList());
- if (entries != null && entries.length > 0) {
- replicationSinkHandler.replicateLogEntries(entries);
- }
+ this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
+ ((PayloadCarryingRpcController)controller).cellScanner());
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java Tue Jul 2 21:35:30 2013
@@ -19,9 +19,11 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
/**
* A sink for a replication stream has to expose this service.
@@ -30,11 +32,11 @@ import org.apache.hadoop.hbase.regionser
*/
@InterfaceAudience.Private
public interface ReplicationSinkService extends ReplicationService {
-
- /**
+ /**
* Carry on the list of log entries down to the sink
- * @param entries list of entries to replicate
+ * @param entries list of WALEntries to replicate
+ * @param cells Cells that the WALEntries refer to (if cells is non-null)
* @throws IOException
*/
- public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
-}
+ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java Tue Jul 2 21:35:30 2013
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.io.IOException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -30,10 +28,9 @@ import org.apache.hadoop.hbase.regionser
*/
@InterfaceAudience.Private
public interface ReplicationSourceService extends ReplicationService {
-
/**
* Returns a WALObserver for the service. This is needed to
* observe log rolls and log archival events.
*/
public WALActionsListener getWALActionsListener();
-}
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Tue Jul 2 21:35:30 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
+import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.Executors;
@@ -33,13 +34,14 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -163,11 +165,14 @@ public class Replication implements WALA
/**
* Carry on the list of log entries down to the sink
* @param entries list of entries to replicate
+ * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
+ * do not contain the Cells we are replicating; they are passed here on the side in this
+ * CellScanner).
* @throws IOException
*/
- public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+ public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
if (this.replication) {
- this.replicationSink.replicateEntries(entries);
+ this.replicationSink.replicateEntries(entries, cells);
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jul 2 21:35:30 2013
@@ -34,19 +34,23 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
@@ -108,17 +112,17 @@ public class ReplicationSink {
}
/**
- * Replicate this array of entries directly into the local cluster
- * using the native client.
+ * Replicate this array of entries directly into the local cluster using the native client.
+ * Like {@link #replicateEntries(org.apache.hadoop.hbase.regionserver.wal.HLog.Entry[])} only
+ * operates against raw protobuf type saving on a convertion from pb to pojo.
*
* @param entries
+ * @param cells
* @throws IOException
*/
- public void replicateEntries(HLog.Entry[] entries)
- throws IOException {
- if (entries.length == 0) {
- return;
- }
+ public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+ if (entries.isEmpty()) return;
+ if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
// Very simple optimization where we batch sequences of rows going
// to the same table.
try {
@@ -126,40 +130,41 @@ public class ReplicationSink {
// Map of table => list of Rows, we only want to flushCommits once per
// invocation of this method per table.
Map<byte[], List<Row>> rows = new TreeMap<byte[], List<Row>>(Bytes.BYTES_COMPARATOR);
- for (HLog.Entry entry : entries) {
- WALEdit edit = entry.getEdit();
- byte[] table = entry.getKey().getTablename();
- Put put = null;
- Delete del = null;
- KeyValue lastKV = null;
- List<KeyValue> kvs = edit.getKeyValues();
- for (KeyValue kv : kvs) {
- if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
- if (kv.isDelete()) {
- del = new Delete(kv.getRow());
- del.setClusterId(entry.getKey().getClusterId());
- addToMultiMap(rows, table, del);
- } else {
- put = new Put(kv.getRow());
- put.setClusterId(entry.getKey().getClusterId());
- addToMultiMap(rows, table, put);
- }
+ for (WALEntry entry : entries) {
+ byte[] table = entry.getKey().getTableName().toByteArray();
+ Cell previousCell = null;
+ Mutation m = null;
+ java.util.UUID uuid = toUUID(entry.getKey().getClusterId());
+ int count = entry.getAssociatedCellCount();
+ for (int i = 0; i < count; i++) {
+ // Throw index out of bounds if our cell count is off
+ if (!cells.advance()) {
+ throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
- if (kv.isDelete()) {
- del.addDeleteMarker(kv);
+ Cell cell = cells.current();
+ if (isNewRowOrType(previousCell, cell)) {
+ // Create new mutation
+ m = CellUtil.isDelete(cell)?
+ new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
+ new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ m.setClusterId(uuid);
+ addToMultiMap(rows, table, m);
+ }
+ if (CellUtil.isDelete(cell)) {
+ ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
} else {
- put.add(kv);
+ ((Put)m).add(KeyValueUtil.ensureKeyValue(cell));
}
- lastKV = kv;
+ previousCell = cell;
}
totalReplicated++;
}
for (Entry<byte[], List<Row>> entry : rows.entrySet()) {
batch(entry.getKey(), entry.getValue());
}
- this.metrics.setAgeOfLastAppliedOp(
- entries[entries.length-1].getKey().getWriteTime());
- this.metrics.applyBatch(entries.length);
+ int size = entries.size();
+ this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
+ this.metrics.applyBatch(size);
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
@@ -168,6 +173,20 @@ public class ReplicationSink {
}
/**
+ * @param previousCell
+ * @param cell
+ * @return True if we have crossed over onto a new row or type
+ */
+ private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
+ return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
+ !CellUtil.matchingRow(previousCell, cell);
+ }
+
+ private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
+ return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
+ }
+
+ /**
* Simple helper to a map from key to (a list of) values
* TODO: Make a general utility method
* @param map
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Jul 2 21:35:30 2013
@@ -177,6 +177,9 @@ public class ReplicationSource extends T
new PriorityBlockingQueue<Path>(
conf.getInt("hbase.regionserver.maxlogs", 32),
new LogsComparator());
+ // TODO: This connection is replication specific or we should make it particular to
+ // replication and make replication specific settings such as compression or codec to use
+ // passing Cells.
this.conn = HConnectionManager.getConnection(conf);
this.zkHelper = manager.getRepZkWrapper();
this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
@@ -456,7 +459,6 @@ public class ReplicationSource extends T
// Connect to peer cluster first, unless we have to stop
while (this.isActive() && this.currentPeers.size() == 0) {
-
chooseSinks();
if (this.isActive() && this.currentPeers.size() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java?rev=1499118&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java Tue Jul 2 21:35:30 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(SmallTests.class)
+public class TestReplicationProtobuf {
+ /**
+ * Little test to check we can basically convert list of a list of KVs into a CellScanner
+ * @throws IOException
+ */
+ @Test
+ public void testGetCellScanner() throws IOException {
+ List<KeyValue> a = new ArrayList<KeyValue>();
+ KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L);
+ a.add(akv);
+ // Add a few just to make it less regular.
+ a.add(new KeyValue(Bytes.toBytes("aa"), -1L));
+ a.add(new KeyValue(Bytes.toBytes("aaa"), -1L));
+ List<KeyValue> b = new ArrayList<KeyValue>();
+ KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L);
+ a.add(bkv);
+ List<KeyValue> c = new ArrayList<KeyValue>();
+ KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L);
+ c.add(ckv);
+ List<List<? extends Cell>> all = new ArrayList<List<? extends Cell>>();
+ all.add(a);
+ all.add(b);
+ all.add(c);
+ CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all, 0);
+ testAdvancetHasSameRow(scanner, akv);
+ // Skip over aa
+ scanner.advance();
+ // Skip over aaa
+ scanner.advance();
+ testAdvancetHasSameRow(scanner, bkv);
+ testAdvancetHasSameRow(scanner, ckv);
+ assertFalse(scanner.advance());
+ }
+
+ private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException {
+ scanner.advance();
+ assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(),
+ scanner.current().getRowLength(),
+ kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java Tue Jul 2 21:35:30 2013
@@ -449,13 +449,14 @@ public class TestReplicationSmallTests e
scan = new Scan();
+ long start = System.currentTimeMillis();
for (int i = 0; i < NB_RETRIES; i++) {
scanner = htable2.getScanner(scan);
res = scanner.next(NB_ROWS_IN_BIG_BATCH);
scanner.close();
if (res.length != NB_ROWS_IN_BIG_BATCH) {
- if (i == NB_RETRIES-1) {
+ if (i == NB_RETRIES - 1) {
int lastRow = -1;
for (Result result : res) {
int currentRow = Bytes.toInt(result.getRow());
@@ -465,8 +466,9 @@ public class TestReplicationSmallTests e
lastRow = currentRow;
}
LOG.error("Last row: " + lastRow);
- fail("Waited too much time for normal batch replication, "
- + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH);
+ fail("Waited too much time for normal batch replication, " +
+ res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
+ (System.currentTimeMillis() - start) + "ms");
} else {
LOG.info("Only got " + res.length + " rows");
Thread.sleep(SLEEP_TIME);