You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/31 05:15:03 UTC
hbase git commit: HBASE-16531 Move cell block related code out of
IPCUtil
Repository: hbase
Updated Branches:
refs/heads/master ea1552270 -> 647a65ce0
HBASE-16531 Move cell block related code out of IPCUtil
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/647a65ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/647a65ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/647a65ce
Branch: refs/heads/master
Commit: 647a65ce01d695bd181c8c5029c4ddd63cab9692
Parents: ea15522
Author: zhangduo <zh...@apache.org>
Authored: Tue Aug 30 18:26:42 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Aug 31 13:09:22 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/ipc/AbstractRpcClient.java | 34 +--
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 8 +-
.../hadoop/hbase/ipc/CellBlockBuilder.java | 229 ++++++++++++++++
.../ipc/CellScannerButNoCodecException.java | 31 +++
.../org/apache/hadoop/hbase/ipc/IPCUtil.java | 272 +++++--------------
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 33 +--
.../hadoop/hbase/ipc/TestCellBlockBuilder.java | 196 +++++++++++++
.../apache/hadoop/hbase/ipc/TestIPCUtil.java | 185 ++-----------
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 10 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 18 --
10 files changed, 549 insertions(+), 467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 3d3339a..4d0d16b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -26,10 +26,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import org.apache.commons.logging.Log;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -64,7 +61,7 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final MetricsConnection metrics;
protected UserProvider userProvider;
- protected final IPCUtil ipcUtil;
+ protected final CellBlockBuilder cellBlockBuilder;
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment.
@@ -98,7 +95,7 @@ public abstract class AbstractRpcClient implements RpcClient {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
- this.ipcUtil = new IPCUtil(conf);
+ this.cellBlockBuilder = new CellBlockBuilder(conf);
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.conf = conf;
@@ -293,33 +290,6 @@ public abstract class AbstractRpcClient implements RpcClient {
}
/**
- * Takes an Exception and the address we were trying to connect to and return an IOException with
- * the input exception as the cause. The new exception provides the stack trace of the place where
- * the exception is thrown and some extra diagnostics information. If the exception is
- * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
- * an IOException.
- * @param addr target address
- * @param exception the relevant exception
- * @return an exception to throw
- */
- protected IOException wrapException(InetSocketAddress addr, Exception exception) {
- if (exception instanceof ConnectException) {
- // connection refused; include the host:port in the error
- return (ConnectException) new ConnectException("Call to " + addr
- + " failed on connection exception: " + exception).initCause(exception);
- } else if (exception instanceof SocketTimeoutException) {
- return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
- + " failed because " + exception).initCause(exception);
- } else if (exception instanceof ConnectionClosingException) {
- return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
- + " failed on local exception: " + exception).initCause(exception);
- } else {
- return (IOException) new IOException("Call to " + addr + " failed on local exception: "
- + exception).initCause(exception);
- }
- }
-
- /**
* Blocking rpc channel that goes via hbase rpc.
*/
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index 3d343b4..e368c43 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -260,11 +260,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
- throw wrapException(addr, (Exception) e.getCause());
+ throw IPCUtil.wrapException(addr, (Exception) e.getCause());
}
} catch (TimeoutException e) {
CallTimeoutException cte = new CallTimeoutException(promise.toString());
- throw wrapException(addr, cte);
+ throw IPCUtil.wrapException(addr, cte);
}
}
@@ -359,7 +359,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws java.io.IOException on error on creation cell scanner
*/
public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
- return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
+ return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
}
/**
@@ -370,7 +370,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws java.io.IOException if block creation fails
*/
public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
- return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
+ return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells);
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
new file mode 100644
index 0000000..072a490
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.io.ByteBufferInputStream;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Helper class for building cell block.
+ */
+@InterfaceAudience.Private
+public class CellBlockBuilder {
+
+ // LOG is being used in TestCellBlockBuilder
+ static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
+
+ private final Configuration conf;
+ /**
+ * How much we think the decompressor will expand the original compressed content.
+ */
+ private final int cellBlockDecompressionMultiplier;
+
+ private final int cellBlockBuildingInitialBufferSize;
+
+ public CellBlockBuilder(final Configuration conf) {
+ this.conf = conf;
+ this.cellBlockDecompressionMultiplier = conf
+ .getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+
+ // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
+ // #buildCellBlock.
+ this.cellBlockBuildingInitialBufferSize = ClassSize
+ .align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
+ * @param codec to use for encoding
+ * @param compressor to use for encoding
+ * @param cellScanner to encode
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+ * been flipped and is ready for reading. Use limit to find total size.
+ * @throws IOException if encoding the cells fail
+ */
+ public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
+ final CellScanner cellScanner) throws IOException {
+ if (cellScanner == null) {
+ return null;
+ }
+ if (codec == null) {
+ throw new CellScannerButNoCodecException();
+ }
+ int bufferSize = this.cellBlockBuildingInitialBufferSize;
+ ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
+ encodeCellsTo(baos, cellScanner, codec, compressor);
+ if (LOG.isTraceEnabled()) {
+ if (bufferSize < baos.size()) {
+ LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+ + "; up hbase.ipc.cellblock.building.initial.buffersize?");
+ }
+ }
+ ByteBuffer bb = baos.getByteBuffer();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ if (!bb.hasRemaining()) return null;
+ return bb;
+ }
+
+ private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
+ CompressionCodec compressor) throws IOException {
+ OutputStream os = bbos;
+ Compressor poolCompressor = null;
+ try {
+ if (compressor != null) {
+ if (compressor instanceof Configurable) {
+ ((Configurable) compressor).setConf(this.conf);
+ }
+ poolCompressor = CodecPool.getCompressor(compressor);
+ os = compressor.createOutputStream(os, poolCompressor);
+ }
+ Codec.Encoder encoder = codec.getEncoder(os);
+ while (cellScanner.advance()) {
+ encoder.write(cellScanner.current());
+ }
+ encoder.flush();
+ } catch (BufferOverflowException e) {
+ throw new DoNotRetryIOException(e);
+ } finally {
+ os.close();
+ if (poolCompressor != null) {
+ CodecPool.returnCompressor(poolCompressor);
+ }
+ }
+ }
+
+ /**
+ * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+ * <code>compressor</code>.
+ * @param codec to use for encoding
+ * @param compressor to use for encoding
+ * @param cellScanner to encode
+ * @param pool Pool of ByteBuffers to make use of.
+ * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+ * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
+ * been flipped and is ready for reading. Use limit to find total size. If
+ * <code>pool</code> was not null, then this returned ByteBuffer came from there and
+ * should be returned to the pool when done.
+ * @throws IOException if encoding the cells fail
+ */
+ public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
+ CellScanner cellScanner, ByteBufferPool pool) throws IOException {
+ if (cellScanner == null) {
+ return null;
+ }
+ if (codec == null) {
+ throw new CellScannerButNoCodecException();
+ }
+ assert pool != null;
+ ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
+ encodeCellsTo(bbos, cellScanner, codec, compressor);
+ if (bbos.size() == 0) {
+ bbos.releaseResources();
+ return null;
+ }
+ return bbos;
+ }
+
+ /**
+ * @param codec to use for cellblock
+ * @param cellBlock to encode
+ * @return CellScanner to work against the content of <code>cellBlock</code>
+ * @throws IOException if encoding fails
+ */
+ public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
+ final byte[] cellBlock) throws IOException {
+ // Use this method from Client side to create the CellScanner
+ ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
+ if (compressor != null) {
+ cellBlockBuf = decompress(compressor, cellBlockBuf);
+ }
+ // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
+ // make Cells directly over the passed BB. This method is called at client side and we don't
+ // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
+ // of the Cells at user's app level will make it not possible to GC the response byte[]
+ return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
+ }
+
+ /**
+ * @param codec to use for cellblock
+ * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
+ * position()'ed at the start of the cell block and limit()'ed at the end.
+ * @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
+ * out of the CellScanner will share the same ByteBuffer being passed.
+ * @throws IOException if cell encoding fails
+ */
+ public CellScanner createCellScannerReusingBuffers(final Codec codec,
+ final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
+ // Use this method from HRS to create the CellScanner
+ // If compressed, decompress it first before passing it on else we will leak compression
+ // resources if the stream is not closed properly after we let it out.
+ if (compressor != null) {
+ cellBlock = decompress(compressor, cellBlock);
+ }
+ return codec.getDecoder(cellBlock);
+ }
+
+ private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
+ throws IOException {
+ // GZIPCodec fails w/ NPE if no configuration.
+ if (compressor instanceof Configurable) {
+ ((Configurable) compressor).setConf(this.conf);
+ }
+ Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
+ CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
+ poolDecompressor);
+ ByteBufferOutputStream bbos;
+ try {
+ // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
+ // TODO: Reuse buffers.
+ bbos = new ByteBufferOutputStream(
+ cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
+ IOUtils.copy(cis, bbos);
+ bbos.close();
+ cellBlock = bbos.getByteBuffer();
+ } finally {
+ CodecPool.returnDecompressor(poolDecompressor);
+ }
+ return cellBlock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
new file mode 100644
index 0000000..ffd27b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellScannerButNoCodecException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown if a cellscanner but no codec to encode it with.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class CellScannerButNoCodecException extends HBaseIOException {
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 74f934c..c18bd7e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -20,221 +20,25 @@ package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
+
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.BufferOverflowException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.io.ByteBufferInputStream;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.ipc.RemoteException;
/**
* Utility to help ipc'ing.
*/
@InterfaceAudience.Private
public class IPCUtil {
- // LOG is being used in TestIPCUtil
- public static final Log LOG = LogFactory.getLog(IPCUtil.class);
- /**
- * How much we think the decompressor will expand the original compressed content.
- */
- private final int cellBlockDecompressionMultiplier;
- private final int cellBlockBuildingInitialBufferSize;
- private final Configuration conf;
-
- public IPCUtil(final Configuration conf) {
- super();
- this.conf = conf;
- this.cellBlockDecompressionMultiplier =
- conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
-
- // Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
- // #buildCellBlock.
- this.cellBlockBuildingInitialBufferSize =
- ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
- }
-
- /**
- * Thrown if a cellscanner but no codec to encode it with.
- */
- public static class CellScannerButNoCodecException extends HBaseIOException {};
-
- /**
- * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
- * <code>compressor</code>.
- * @param codec to use for encoding
- * @param compressor to use for encoding
- * @param cellScanner to encode
- * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
- * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
- * flipped and is ready for reading. Use limit to find total size.
- * @throws IOException if encoding the cells fail
- */
- @SuppressWarnings("resource")
- public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
- final CellScanner cellScanner) throws IOException {
- if (cellScanner == null) {
- return null;
- }
- if (codec == null) {
- throw new CellScannerButNoCodecException();
- }
- int bufferSize = this.cellBlockBuildingInitialBufferSize;
- ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
- encodeCellsTo(baos, cellScanner, codec, compressor);
- if (LOG.isTraceEnabled()) {
- if (bufferSize < baos.size()) {
- LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
- + "; up hbase.ipc.cellblock.building.initial.buffersize?");
- }
- }
- ByteBuffer bb = baos.getByteBuffer();
- // If no cells, don't mess around. Just return null (could be a bunch of existence checking
- // gets or something -- stuff that does not return a cell).
- if (!bb.hasRemaining()) return null;
- return bb;
- }
-
- private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
- CompressionCodec compressor) throws IOException {
- OutputStream os = bbos;
- Compressor poolCompressor = null;
- try {
- if (compressor != null) {
- if (compressor instanceof Configurable) {
- ((Configurable) compressor).setConf(this.conf);
- }
- poolCompressor = CodecPool.getCompressor(compressor);
- os = compressor.createOutputStream(os, poolCompressor);
- }
- Codec.Encoder encoder = codec.getEncoder(os);
- while (cellScanner.advance()) {
- encoder.write(cellScanner.current());
- }
- encoder.flush();
- } catch (BufferOverflowException e) {
- throw new DoNotRetryIOException(e);
- } finally {
- os.close();
- if (poolCompressor != null) {
- CodecPool.returnCompressor(poolCompressor);
- }
- }
- }
-
- /**
- * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
- * <code>compressor</code>.
- * @param codec to use for encoding
- * @param compressor to use for encoding
- * @param cellScanner to encode
- * @param pool Pool of ByteBuffers to make use of.
- * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
- * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
- * flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
- * null, then this returned ByteBuffer came from there and should be returned to the pool when
- * done.
- * @throws IOException if encoding the cells fail
- */
- @SuppressWarnings("resource")
- public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
- CellScanner cellScanner, ByteBufferPool pool) throws IOException {
- if (cellScanner == null) {
- return null;
- }
- if (codec == null) {
- throw new CellScannerButNoCodecException();
- }
- assert pool != null;
- ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
- encodeCellsTo(bbos, cellScanner, codec, compressor);
- if (bbos.size() == 0) {
- bbos.releaseResources();
- return null;
- }
- return bbos;
- }
-
- /**
- * @param codec to use for cellblock
- * @param cellBlock to encode
- * @return CellScanner to work against the content of <code>cellBlock</code>
- * @throws IOException if encoding fails
- */
- public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
- final byte[] cellBlock) throws IOException {
- // Use this method from Client side to create the CellScanner
- ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
- if (compressor != null) {
- cellBlockBuf = decompress(compressor, cellBlockBuf);
- }
- // Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
- // make Cells directly over the passed BB. This method is called at client side and we don't
- // want the Cells to share the same byte[] where the RPC response is being read. Caching of any
- // of the Cells at user's app level will make it not possible to GC the response byte[]
- return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
- }
-
- /**
- * @param codec to use for cellblock
- * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
- * position()'ed at the start of the cell block and limit()'ed at the end.
- * @return CellScanner to work against the content of <code>cellBlock</code>.
- * All cells created out of the CellScanner will share the same ByteBuffer being passed.
- * @throws IOException if cell encoding fails
- */
- public CellScanner createCellScannerReusingBuffers(final Codec codec,
- final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
- // Use this method from HRS to create the CellScanner
- // If compressed, decompress it first before passing it on else we will leak compression
- // resources if the stream is not closed properly after we let it out.
- if (compressor != null) {
- cellBlock = decompress(compressor, cellBlock);
- }
- return codec.getDecoder(cellBlock);
- }
-
- private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
- throws IOException {
- // GZIPCodec fails w/ NPE if no configuration.
- if (compressor instanceof Configurable) {
- ((Configurable) compressor).setConf(this.conf);
- }
- Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
- CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
- poolDecompressor);
- ByteBufferOutputStream bbos;
- try {
- // TODO: This is ugly. The buffer will be resized on us if we guess wrong.
- // TODO: Reuse buffers.
- bbos = new ByteBufferOutputStream(
- cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
- IOUtils.copy(cis, bbos);
- bbos.close();
- cellBlock = bbos.getByteBuffer();
- } finally {
- CodecPool.returnDecompressor(poolDecompressor);
- }
- return cellBlock;
- }
/**
* Write out header, param, and cell block if there is one.
@@ -246,10 +50,9 @@ public class IPCUtil {
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
- final ByteBuffer cellBlock)
- throws IOException {
+ final ByteBuffer cellBlock) throws IOException {
// Must calculate total size and write that first so other side can read it all in in one
- // swoop. This is dictated by how the server is currently written. Server needs to change
+ // swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) {
@@ -259,8 +62,7 @@ public class IPCUtil {
}
private static int write(final OutputStream dos, final Message header, final Message param,
- final ByteBuffer cellBlock, final int totalSize)
- throws IOException {
+ final ByteBuffer cellBlock, final int totalSize) throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
@@ -278,9 +80,9 @@ public class IPCUtil {
/**
* @return Size on the wire when the two messages are written with writeDelimitedTo
*/
- public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
+ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
int totalSize = 0;
- for (Message m: messages) {
+ for (Message m : messages) {
if (m == null) {
continue;
}
@@ -290,4 +92,52 @@ public class IPCUtil {
Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
return totalSize;
}
+
+ /**
+ * @return True if the exception is a fatal connection exception.
+ */
+ public static boolean isFatalConnectionException(final ExceptionResponse e) {
+ return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
+ }
+
+ /**
+ * @param e exception to be wrapped
+ * @return RemoteException made from passed <code>e</code>
+ */
+ public static RemoteException createRemoteException(final ExceptionResponse e) {
+ String innerExceptionClassName = e.getExceptionClassName();
+ boolean doNotRetry = e.getDoNotRetry();
+ return e.hasHostname() ?
+ // If a hostname then add it to the RemoteWithExtrasException
+ new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
+ e.getPort(), doNotRetry)
+ : new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
+ }
+
+ /**
+ * Takes an Exception and the address we were trying to connect to and return an IOException with
+ * the input exception as the cause. The new exception provides the stack trace of the place where
+ * the exception is thrown and some extra diagnostics information. If the exception is
+ * ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
+ * an IOException.
+ * @param addr target address
+ * @param exception the relevant exception
+ * @return an exception to throw
+ */
+ public static IOException wrapException(InetSocketAddress addr, Exception exception) {
+ if (exception instanceof ConnectException) {
+ // connection refused; include the host:port in the error
+ return (ConnectException) new ConnectException(
+ "Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
+ } else if (exception instanceof SocketTimeoutException) {
+ return (SocketTimeoutException) new SocketTimeoutException(
+ "Call to " + addr + " failed because " + exception).initCause(exception);
+ } else if (exception instanceof ConnectionClosingException) {
+ return (ConnectionClosingException) new ConnectionClosingException(
+ "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
+ } else {
+ return (IOException) new IOException(
+ "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index 37b9afd..03b2953 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -896,7 +896,7 @@ public class RpcClientImpl extends AbstractRpcClient {
}
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
- ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);
+ ByteBuffer cellBlock = cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells);
if (cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
cellBlockBuilder.setLength(cellBlock.limit());
@@ -997,12 +997,12 @@ public class RpcClientImpl extends AbstractRpcClient {
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
- RemoteException re = createRemoteException(exceptionResponse);
+ RemoteException re = IPCUtil.createRemoteException(exceptionResponse);
call.setException(re);
call.callStats.setResponseSizeBytes(totalSize);
call.callStats.setCallTimeMs(
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
- if (isFatalConnectionException(exceptionResponse)) {
+ if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
markClosed(re);
}
} else {
@@ -1017,7 +1017,7 @@ public class RpcClientImpl extends AbstractRpcClient {
int size = responseHeader.getCellBlockMeta().getLength();
byte [] cellBlock = new byte[size];
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
- cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
+ cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
}
call.setResponse(value, cellBlockScanner);
call.callStats.setResponseSizeBytes(totalSize);
@@ -1044,29 +1044,6 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
- /**
- * @return True if the exception is a fatal connection exception.
- */
- private boolean isFatalConnectionException(final ExceptionResponse e) {
- return e.getExceptionClassName().
- equals(FatalConnectionException.class.getName());
- }
-
- /**
- * @param e exception to be wrapped
- * @return RemoteException made from passed <code>e</code>
- */
- private RemoteException createRemoteException(final ExceptionResponse e) {
- String innerExceptionClassName = e.getExceptionClassName();
- boolean doNotRetry = e.getDoNotRetry();
- return e.hasHostname()?
- // If a hostname then add it to the RemoteWithExtrasException
- new RemoteWithExtrasException(innerExceptionClassName,
- e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
- new RemoteWithExtrasException(innerExceptionClassName,
- e.getStackTrace(), doNotRetry);
- }
-
protected synchronized boolean markClosed(IOException e) {
if (e == null){
throw new NullPointerException();
@@ -1322,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient {
throw call.error;
}
// local exception
- throw wrapException(addr, call.error);
+ throw IPCUtil.wrapException(addr, call.error);
}
return call;
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
new file mode 100644
index 0000000..b780b95
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestCellBlockBuilder.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.commons.lang.time.StopWatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestCellBlockBuilder {
+
+ private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class);
+
+ CellBlockBuilder builder;
+
+ @Before
+ public void before() {
+ this.builder = new CellBlockBuilder(new Configuration());
+ }
+
+ @Test
+ public void testBuildCellBlock() throws IOException {
+ doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null);
+ doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new DefaultCodec());
+ doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new GzipCodec());
+ }
+
+ static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
+ final CompressionCodec compressor) throws IOException {
+ doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
+ }
+
+ static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
+ final CompressionCodec compressor, final int count, final int size, final boolean sized)
+ throws IOException {
+ Cell[] cells = getCells(count, size);
+ CellScanner cellScanner = sized ? getSizedCellScanner(cells)
+ : CellUtil.createCellScanner(Arrays.asList(cells).iterator());
+ ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
+ cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
+ int i = 0;
+ while (cellScanner.advance()) {
+ i++;
+ }
+ assertEquals(count, i);
+ }
+
+ static CellScanner getSizedCellScanner(final Cell[] cells) {
+ int size = -1;
+ for (Cell cell : cells) {
+ size += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ final int totalSize = ClassSize.align(size);
+ final CellScanner cellScanner = CellUtil.createCellScanner(cells);
+ return new SizedCellScanner() {
+ @Override
+ public long heapSize() {
+ return totalSize;
+ }
+
+ @Override
+ public Cell current() {
+ return cellScanner.current();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return cellScanner.advance();
+ }
+ };
+ }
+
+ static Cell[] getCells(final int howMany) {
+ return getCells(howMany, 1024);
+ }
+
+ static Cell[] getCells(final int howMany, final int valueSize) {
+ Cell[] cells = new Cell[howMany];
+ byte[] value = new byte[valueSize];
+ for (int i = 0; i < howMany; i++) {
+ byte[] index = Bytes.toBytes(i);
+ KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
+ cells[i] = kv;
+ }
+ return cells;
+ }
+
+ private static final String COUNT = "--count=";
+ private static final String SIZE = "--size=";
+
+ /**
+ * Prints usage and then exits w/ passed <code>errCode</code>
+ * @param errCode
+ */
+ private static void usage(final int errCode) {
+ System.out.println("Usage: IPCUtil [options]");
+ System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
+ System.out.println(" --count Count of Cells");
+ System.out.println(" --size Size of Cell values");
+ System.out.println("Example: IPCUtil --count=1024 --size=1024");
+ System.exit(errCode);
+ }
+
+ private static void timerTests(final CellBlockBuilder util, final int count, final int size,
+ final Codec codec, final CompressionCodec compressor) throws IOException {
+ final int cycles = 1000;
+ StopWatch timer = new StopWatch();
+ timer.start();
+ for (int i = 0; i < cycles; i++) {
+ timerTest(util, timer, count, size, codec, compressor, false);
+ }
+ timer.stop();
+ LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count="
+ + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+ timer.reset();
+ timer.start();
+ for (int i = 0; i < cycles; i++) {
+ timerTest(util, timer, count, size, codec, compressor, true);
+ }
+ timer.stop();
+ LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count="
+ + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+ }
+
+ private static void timerTest(final CellBlockBuilder util, final StopWatch timer, final int count,
+ final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
+ throws IOException {
+ doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
+ }
+
+ /**
+ * For running a few tests of methods herein.
+ * @param args
+ * @throws IOException
+ */
+ public static void main(String[] args) throws IOException {
+ int count = 1024;
+ int size = 10240;
+ for (String arg : args) {
+ if (arg.startsWith(COUNT)) {
+ count = Integer.parseInt(arg.replace(COUNT, ""));
+ } else if (arg.startsWith(SIZE)) {
+ size = Integer.parseInt(arg.replace(SIZE, ""));
+ } else {
+ usage(1);
+ }
+ }
+ CellBlockBuilder util = new CellBlockBuilder(HBaseConfiguration.create());
+ ((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL);
+ timerTests(util, count, size, new KeyValueCodec(), null);
+ timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
+ timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
index c90b275..ef534c0 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
@@ -17,181 +17,28 @@
*/
package org.apache.hadoop.hbase.ipc;
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+import static org.junit.Assert.assertTrue;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
-import org.apache.commons.lang.time.StopWatch;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.codec.KeyValueCodec;
-import org.apache.hadoop.hbase.io.SizedCellScanner;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ClassSize;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.log4j.Level;
-import org.junit.Before;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-@Category({ClientTests.class, SmallTests.class})
public class TestIPCUtil {
- private static final Log LOG = LogFactory.getLog(TestIPCUtil.class);
-
- IPCUtil util;
- @Before
- public void before() {
- this.util = new IPCUtil(new Configuration());
- }
-
@Test
- public void testBuildCellBlock() throws IOException {
- doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
- doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec());
- doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec());
- }
-
- static void doBuildCellBlockUndoCellBlock(final IPCUtil util,
- final Codec codec, final CompressionCodec compressor)
- throws IOException {
- doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
- }
-
- static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec,
- final CompressionCodec compressor, final int count, final int size, final boolean sized)
- throws IOException {
- Cell [] cells = getCells(count, size);
- CellScanner cellScanner = sized? getSizedCellScanner(cells):
- CellUtil.createCellScanner(Arrays.asList(cells).iterator());
- ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
- cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
- int i = 0;
- while (cellScanner.advance()) {
- i++;
- }
- assertEquals(count, i);
- }
-
- static CellScanner getSizedCellScanner(final Cell [] cells) {
- int size = -1;
- for (Cell cell: cells) {
- size += CellUtil.estimatedSerializedSizeOf(cell);
- }
- final int totalSize = ClassSize.align(size);
- final CellScanner cellScanner = CellUtil.createCellScanner(cells);
- return new SizedCellScanner() {
- @Override
- public long heapSize() {
- return totalSize;
- }
-
- @Override
- public Cell current() {
- return cellScanner.current();
- }
-
- @Override
- public boolean advance() throws IOException {
- return cellScanner.advance();
- }
- };
- }
-
- static Cell [] getCells(final int howMany) {
- return getCells(howMany, 1024);
- }
-
- static Cell [] getCells(final int howMany, final int valueSize) {
- Cell [] cells = new Cell[howMany];
- byte [] value = new byte[valueSize];
- for (int i = 0; i < howMany; i++) {
- byte [] index = Bytes.toBytes(i);
- KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
- cells[i] = kv;
- }
- return cells;
- }
-
- private static final String COUNT = "--count=";
- private static final String SIZE = "--size=";
-
- /**
- * Prints usage and then exits w/ passed <code>errCode</code>
- * @param errCode
- */
- private static void usage(final int errCode) {
- System.out.println("Usage: IPCUtil [options]");
- System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
- System.out.println(" --count Count of Cells");
- System.out.println(" --size Size of Cell values");
- System.out.println("Example: IPCUtil --count=1024 --size=1024");
- System.exit(errCode);
- }
-
- private static void timerTests(final IPCUtil util, final int count, final int size,
- final Codec codec, final CompressionCodec compressor)
- throws IOException {
- final int cycles = 1000;
- StopWatch timer = new StopWatch();
- timer.start();
- for (int i = 0; i < cycles; i++) {
- timerTest(util, timer, count, size, codec, compressor, false);
- }
- timer.stop();
- LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false +
- ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
- timer.reset();
- timer.start();
- for (int i = 0; i < cycles; i++) {
- timerTest(util, timer, count, size, codec, compressor, true);
- }
- timer.stop();
- LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true +
- ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
- }
-
- private static void timerTest(final IPCUtil util, final StopWatch timer, final int count,
- final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
- throws IOException {
- doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
- }
-
- /**
- * For running a few tests of methods herein.
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- int count = 1024;
- int size = 10240;
- for (String arg: args) {
- if (arg.startsWith(COUNT)) {
- count = Integer.parseInt(arg.replace(COUNT, ""));
- } else if (arg.startsWith(SIZE)) {
- size = Integer.parseInt(arg.replace(SIZE, ""));
- } else {
- usage(1);
- }
- }
- IPCUtil util = new IPCUtil(HBaseConfiguration.create());
- ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
- timerTests(util, count, size, new KeyValueCodec(), null);
- timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
- timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
+ public void testWrapException() throws Exception {
+ final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
+ assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException);
+ assertTrue(
+ wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException);
+ assertTrue(wrapException(address, new ConnectionClosingException(
+ "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
+ assertTrue(
+ wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
+ .getCause() instanceof CallTimeoutException);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 4b27924..759da82 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -183,7 +183,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
- private final IPCUtil ipcUtil;
+ private final CellBlockBuilder cellBlockBuilder;
private static final String AUTH_FAILED_FOR = "Auth failed for ";
private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@@ -434,14 +434,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0;
if (reservoir != null) {
- this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec,
+ this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, reservoir);
if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size();
}
} else {
- ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec,
+ ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
this.connection.compressionCodec, cells);
if (b != null) {
cellBlockSize = b.remaining();
@@ -1861,7 +1861,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
- cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
+ cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();
@@ -2058,7 +2058,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
- this.ipcUtil = new IPCUtil(conf);
+ this.cellBlockBuilder = new CellBlockBuilder(conf);
// Create the responder here
http://git-wip-us.apache.org/repos/asf/hbase/blob/647a65ce/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 4cfa25c..be5ad56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -35,10 +35,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
@@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
-import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -394,19 +391,4 @@ public abstract class AbstractTestIPC {
rpcServer.stop();
}
}
-
- @Test
- public void testWrapException() throws Exception {
- AbstractRpcClient client =
- (AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
- final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
- assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
- assertTrue(client.wrapException(address,
- new SocketTimeoutException()) instanceof SocketTimeoutException);
- assertTrue(client.wrapException(address, new ConnectionClosingException(
- "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
- assertTrue(client
- .wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
- .getCause() instanceof CallTimeoutException);
- }
}