You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/11/19 16:39:35 UTC
svn commit: r1543458 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-hadoop1-compat/src/main/ja...
Author: stack
Date: Tue Nov 19 15:39:35 2013
New Revision: 1543458
URL: http://svn.apache.org/r1543458
Log:
HBASE-5945 Reduce buffer copies in IPC server response path
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Tue Nov 19 15:39:35 2013
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -44,7 +43,6 @@ import org.apache.hadoop.io.compress.Com
import org.apache.hadoop.io.compress.Decompressor;
import com.google.common.base.Preconditions;
-import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
@@ -117,10 +115,15 @@ class IPCUtil {
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
+ int count = 0;
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
+ count++;
}
encoder.flush();
+ // If no cells, don't mess around. Just return null (could be a bunch of existence checking
+ // gets or something -- stuff that does not return a cell).
+ if (count == 0) return null;
} finally {
os.close();
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
@@ -187,24 +190,23 @@ class IPCUtil {
}
/**
- * Write out header, param, and cell block if there to a {@link ByteBufferOutputStream} sized
- * to hold these elements.
- * @param header
- * @param param
- * @param cellBlock
- * @return A {@link ByteBufferOutputStream} filled with the content of the passed in
- * <code>header</code>, <code>param</code>, and <code>cellBlock</code>.
+ * @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
+ * serialization.
+ * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException
*/
- static ByteBufferOutputStream write(final Message header, final Message param,
- final ByteBuffer cellBlock)
- throws IOException {
- int totalSize = getTotalSizeWhenWrittenDelimited(header, param);
- if (cellBlock != null) totalSize += cellBlock.limit();
- ByteBufferOutputStream bbos = new ByteBufferOutputStream(totalSize);
- write(bbos, header, param, cellBlock, totalSize);
- bbos.close();
- return bbos;
+ static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
+ if (m == null) return null;
+ int serializedSize = m.getSerializedSize();
+ int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
+ byte [] buffer = new byte[serializedSize + vintSize];
+ // Passing in a byte array saves COS creating a buffer which it does when using streams.
+ CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
+ // This will write out the vint preamble and the message serialized.
+ cos.writeMessageNoTag(m);
+ cos.flush();
+ cos.checkNoSpaceLeft();
+ return ByteBuffer.wrap(buffer);
}
/**
@@ -230,8 +232,9 @@ class IPCUtil {
private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize)
throws IOException {
- // I confirmed toBytes does same as say DataOutputStream#writeInt.
+ // I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize));
+ // This allocates a buffer that is the size of the message internally.
header.writeDelimitedTo(dos);
if (param != null) param.writeDelimitedTo(dos);
if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
@@ -240,20 +243,6 @@ class IPCUtil {
}
/**
- * @param in Stream cue'd up just before a delimited message
- * @return Bytes that hold the bytes that make up the message read from <code>in</code>
- * @throws IOException
- */
- static byte [] getDelimitedMessageBytes(final DataInputStream in) throws IOException {
- byte b = in.readByte();
- int size = CodedInputStream.readRawVarint32(b, in);
- // Allocate right-sized buffer rather than let pb allocate its default minimum 4k.
- byte [] bytes = new byte[size];
- IOUtils.readFully(in, bytes);
- return bytes;
- }
-
- /**
* Read in chunks of 8K (HBASE-7239)
* @param in
* @param dest
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Tue Nov 19 15:39:35 2013
@@ -175,10 +175,13 @@ public final class ProtobufUtil {
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setExists(true);
+ builder.setAssociatedCellCount(0);
EMPTY_RESULT_PB_EXISTS_TRUE = builder.build();
builder.clear();
+
builder.setExists(false);
+ builder.setAssociatedCellCount(0);
EMPTY_RESULT_PB_EXISTS_FALSE = builder.build();
builder.clear();
@@ -1192,19 +1195,11 @@ public final class ProtobufUtil {
* @return the converted protocol buffer Result
*/
public static ClientProtos.Result toResultNoData(final Result result) {
- if (result.getExists() != null){
- return result.getExists() ? EMPTY_RESULT_PB_EXISTS_TRUE : EMPTY_RESULT_PB_EXISTS_FALSE;
- }
-
+ if (result.getExists() != null) return toResult(result.getExists());
int size = result.size();
-
- if (size == 0){
- return EMPTY_RESULT_PB;
- }
-
+ if (size == 0) return EMPTY_RESULT_PB;
ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
builder.setAssociatedCellCount(size);
-
return builder.build();
}
Modified: hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java (original)
+++ hbase/trunk/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java Tue Nov 19 15:39:35 2013
@@ -62,7 +62,7 @@ public interface MetricsHBaseServerSourc
void authenticationFailure();
- void sentBytes(int count);
+ void sentBytes(long count);
void receivedBytes(int count);
Modified: hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (original)
+++ hbase/trunk/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java Tue Nov 19 15:39:35 2013
@@ -86,7 +86,7 @@ public class MetricsHBaseServerSourceImp
}
@Override
- public void sentBytes(int count) {
+ public void sentBytes(long count) {
this.sentBytes.incr(count);
}
Modified: hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java (original)
+++ hbase/trunk/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java Tue Nov 19 15:39:35 2013
@@ -89,7 +89,7 @@ public class MetricsHBaseServerSourceImp
}
@Override
- public void sentBytes(int count) {
+ public void sentBytes(long count) {
this.sentBytes.incr(count);
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java?rev=1543458&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BufferChain.java Tue Nov 19 15:39:35 2013
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Chain of ByteBuffers.
+ * Used writing out an array of byte buffers. Writes in chunks.
+ */
+@InterfaceAudience.Private
+class BufferChain {
+ private static final ByteBuffer [] FOR_TOARRAY_TYPE = new ByteBuffer[0];
+ private final ByteBuffer[] buffers;
+ private int remaining = 0;
+ private int bufferOffset = 0;
+
+ BufferChain(ByteBuffer ... buffers) {
+ // Some of the incoming buffers can be null
+ List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(buffers.length);
+ for (ByteBuffer b : buffers) {
+ if (b == null) continue;
+ bbs.add(b);
+ this.remaining += b.remaining();
+ }
+ this.buffers = bbs.toArray(FOR_TOARRAY_TYPE);
+ }
+
+ /**
+ * Expensive. Makes a new buffer to hold a copy of what is in contained ByteBuffers. This
+ * call drains this instance; it cannot be used subsequent to the call.
+ * @return A new byte buffer with the content of all contained ByteBuffers.
+ */
+ byte [] getBytes() {
+ if (!hasRemaining()) throw new IllegalAccessError();
+ byte [] bytes = new byte [this.remaining];
+ int offset = 0;
+ for (ByteBuffer bb: this.buffers) {
+ System.arraycopy(bb.array(), bb.arrayOffset(), bytes, offset, bb.limit());
+ offset += bb.capacity();
+ }
+ return bytes;
+ }
+
+ boolean hasRemaining() {
+ return remaining > 0;
+ }
+
+ /**
+ * Write out our chain of buffers in chunks
+ * @param channel Where to write
+ * @param chunkSize Size of chunks to write.
+ * @return Amount written.
+ * @throws IOException
+ */
+ long write(GatheringByteChannel channel, int chunkSize) throws IOException {
+ int chunkRemaining = chunkSize;
+ ByteBuffer lastBuffer = null;
+ int bufCount = 0;
+ int restoreLimit = -1;
+
+ while (chunkRemaining > 0 && bufferOffset + bufCount < buffers.length) {
+ lastBuffer = buffers[bufferOffset + bufCount];
+ if (!lastBuffer.hasRemaining()) {
+ bufferOffset++;
+ continue;
+ }
+ bufCount++;
+ if (lastBuffer.remaining() > chunkRemaining) {
+ restoreLimit = lastBuffer.limit();
+ lastBuffer.limit(lastBuffer.position() + chunkRemaining);
+ chunkRemaining = 0;
+ break;
+ } else {
+ chunkRemaining -= lastBuffer.remaining();
+ }
+ }
+ assert lastBuffer != null;
+ if (chunkRemaining == chunkSize) {
+ assert !hasRemaining();
+ // no data left to write
+ return 0;
+ }
+ try {
+ long ret = channel.write(buffers, bufferOffset, bufCount);
+ if (ret > 0) {
+ remaining -= ret;
+ }
+ return ret;
+ } finally {
+ if (restoreLimit >= 0) {
+ lastBuffer.limit(restoreLimit);
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java Tue Nov 19 15:39:35 2013
@@ -47,7 +47,7 @@ public class MetricsHBaseServer {
source.authenticationSuccess();
}
- void sentBytes(int count) {
+ void sentBytes(long count) {
source.sentBytes(count);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Tue Nov 19 15:39:35 2013
@@ -35,6 +35,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -84,12 +85,11 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.hbase.security.SaslStatus;
import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -276,7 +276,10 @@ public class RpcServer implements RpcSer
protected Connection connection; // connection to client
protected long timestamp; // the time received when response is null
// the time served when response is not null
- protected ByteBuffer response; // the response for this call
+ /**
+ * Chain of buffers to send as response.
+ */
+ protected BufferChain response;
protected boolean delayResponse;
protected Responder responder;
protected boolean delayReturnValue; // if the return value should be
@@ -341,14 +344,14 @@ public class RpcServer implements RpcSer
}
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
- this.response = response;
+ this.response = new BufferChain(response);
}
protected synchronized void setResponse(Object m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
- ByteBufferOutputStream bbos = null;
+ BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
// Presume it a pb Message. Could be null.
@@ -380,42 +383,44 @@ public class RpcServer implements RpcSer
headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
}
Message header = headerBuilder.build();
- bbos = IPCUtil.write(header, result, cellBlock);
+
+ // Organize the response as a set of bytebuffers rather than collect it all together inside
+ // one big byte array; save on allocations.
+ ByteBuffer bbHeader = IPCUtil.getDelimitedMessageAsByteBuffer(header);
+ ByteBuffer bbResult = IPCUtil.getDelimitedMessageAsByteBuffer(result);
+ int totalSize = bbHeader.capacity() + (bbResult == null? 0: bbResult.limit()) +
+ (cellBlock == null? 0: cellBlock.limit());
+ ByteBuffer bbTotalSize = ByteBuffer.wrap(Bytes.toBytes(totalSize));
+ bc = new BufferChain(bbTotalSize, bbHeader, bbResult, cellBlock);
if (connection.useWrap) {
- wrapWithSasl(bbos);
+ bc = wrapWithSasl(bc);
}
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
- ByteBuffer bb = null;
- if (bbos != null) {
- // TODO: If SASL, maybe buffer already been flipped and written?
- bb = bbos.getByteBuffer();
- bb.position(0);
- }
- this.response = bb;
+ this.response = bc;
}
- private void wrapWithSasl(ByteBufferOutputStream response)
- throws IOException {
- if (connection.useSasl) {
- // getByteBuffer calls flip()
- ByteBuffer buf = response.getByteBuffer();
- byte[] token;
- // synchronization may be needed since there can be multiple Handler
- // threads using saslServer to wrap responses.
- synchronized (connection.saslServer) {
- token = connection.saslServer.wrap(buf.array(),
- buf.arrayOffset(), buf.remaining());
- }
- if (LOG.isDebugEnabled())
- LOG.debug("Adding saslServer wrapped token of size " + token.length
- + " as call response.");
- buf.clear();
- DataOutputStream saslOut = new DataOutputStream(response);
- saslOut.writeInt(token.length);
- saslOut.write(token, 0, token.length);
- }
+ private BufferChain wrapWithSasl(BufferChain bc)
+ throws IOException {
+ if (bc == null) return bc;
+ if (!this.connection.useSasl) return bc;
+ // Looks like no way around this; saslserver wants a byte array. I have to make it one.
+ // THIS IS A BIG UGLY COPY.
+ byte [] responseBytes = bc.getBytes();
+ byte [] token;
+ // synchronization may be needed since there can be multiple Handler
+ // threads using saslServer to wrap responses.
+ synchronized (connection.saslServer) {
+ token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Adding saslServer wrapped token of size " + token.length
+ + " as call response.");
+
+ ByteBuffer bbTokenLength = ByteBuffer.wrap(Bytes.toBytes(token.length));
+ ByteBuffer bbTokenBytes = ByteBuffer.wrap(token);
+ return new BufferChain(bbTokenLength, bbTokenBytes);
}
@Override
@@ -994,7 +999,7 @@ public class RpcServer implements RpcSer
//
// Send as much data as we can in the non-blocking fashion
//
- int numBytes = channelWrite(channel, call.response);
+ long numBytes = channelWrite(channel, call.response);
if (numBytes < 0) {
return true;
}
@@ -1347,6 +1352,7 @@ public class RpcServer implements RpcSer
}
}
}
+
/**
* No protobuf encoding of raw sasl messages
*/
@@ -2169,19 +2175,15 @@ public class RpcServer implements RpcSer
* buffer.
*
* @param channel writable byte channel to write to
- * @param buffer buffer to write
+ * @param bufferChain Chain of buffers to write
* @return number of bytes written
* @throws java.io.IOException e
* @see java.nio.channels.WritableByteChannel#write(java.nio.ByteBuffer)
*/
- protected int channelWrite(WritableByteChannel channel,
- ByteBuffer buffer) throws IOException {
-
- int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
- channel.write(buffer) : channelIO(null, channel, buffer);
- if (count > 0) {
- metrics.sentBytes(count);
- }
+ protected long channelWrite(GatheringByteChannel channel, BufferChain bufferChain)
+ throws IOException {
+ long count = bufferChain.write(channel, NIO_BUFFER_LIMIT);
+ if (count > 0) this.metrics.sentBytes(count);
return count;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 19 15:39:35 2013
@@ -2834,7 +2834,7 @@ public class HRegionServer implements Cl
if (existence != null){
ClientProtos.Result pbr = ProtobufUtil.toResult(existence);
builder.setResult(pbr);
- }else if (r != null) {
+ } else if (r != null) {
ClientProtos.Result pbr = ProtobufUtil.toResult(r);
builder.setResult(pbr);
}
@@ -3294,6 +3294,7 @@ public class HRegionServer implements Cl
@Override
public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
throws ServiceException {
+
// rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
// It is also the conduit via which we pass back data.
PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java Tue Nov 19 15:39:35 2013
@@ -64,7 +64,6 @@ public class TestFromClientSide3 {
private final static byte[] VAL_BYTES = Bytes.toBytes("v1");
private final static byte[] ROW_BYTES = Bytes.toBytes("r1");
-
/**
* @throws java.lang.Exception
*/
@@ -345,6 +344,7 @@ public class TestFromClientSide3 {
gets.add(new Get(ROW));
gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
+ LOG.info("Calling exists");
Boolean[] results = table.exists(gets);
assertEquals(results[0], false);
assertEquals(results[1], false);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java Tue Nov 19 15:39:35 2013
@@ -82,8 +82,7 @@ import org.junit.experimental.categories
public class TestRegionObserverInterface {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
- public static final TableName TEST_TABLE =
- TableName.valueOf("TestTable");
+ public static final TableName TEST_TABLE = TableName.valueOf("TestTable");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
@@ -111,126 +110,132 @@ public class TestRegionObserverInterface
@Test
public void testRegionObserver() throws IOException {
- TableName tableName = TEST_TABLE;
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRegionObserver");
// recreate table every time in order to reset the status of the
// coprocessor.
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDelete"},
- TEST_TABLE,
- new Boolean[] {false, false, false, false, false});
-
- Put put = new Put(ROW);
- put.add(A, A, A);
- put.add(B, B, B);
- put.add(C, C, C);
- table.put(put);
+ try {
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadDelete"},
+ tableName,
+ new Boolean[] {false, false, false, false, false});
+
+ Put put = new Put(ROW);
+ put.add(A, A, A);
+ put.add(B, B, B);
+ put.add(C, C, C);
+ table.put(put);
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
- TEST_TABLE,
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+ tableName,
new Boolean[] {false, false, true, true, true, true, false}
- );
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
- TEST_TABLE,
- new Integer[] {1, 1, 0, 0});
-
- Get get = new Get(ROW);
- get.addColumn(A, A);
- get.addColumn(B, B);
- get.addColumn(C, C);
- table.get(get);
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDelete"},
- TEST_TABLE,
- new Boolean[] {true, true, true, true, false}
- );
-
- Delete delete = new Delete(ROW);
- delete.deleteColumn(A, A);
- delete.deleteColumn(B, B);
- delete.deleteColumn(C, C);
- table.delete(delete);
+ );
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
- TEST_TABLE,
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
+ tableName,
+ new Integer[] {1, 1, 0, 0});
+
+ Get get = new Get(ROW);
+ get.addColumn(A, A);
+ get.addColumn(B, B);
+ get.addColumn(C, C);
+ table.get(get);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadDelete"},
+ tableName,
+ new Boolean[] {true, true, true, true, false}
+ );
+
+ Delete delete = new Delete(ROW);
+ delete.deleteColumn(A, A);
+ delete.deleteColumn(B, B);
+ delete.deleteColumn(C, C);
+ table.delete(delete);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+ tableName,
new Boolean[] {true, true, true, true, true, true, true}
- );
- util.deleteTable(tableName);
- table.close();
-
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
verifyMethodResult(SimpleRegionObserver.class,
new String[] {"getCtPreOpen", "getCtPostOpen", "getCtPreClose", "getCtPostClose"},
- TEST_TABLE,
+ tableName,
new Integer[] {1, 1, 1, 1});
}
@Test
public void testRowMutation() throws IOException {
- TableName tableName = TEST_TABLE;
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRowMutation");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
- verifyMethodResult(SimpleRegionObserver.class,
+ try {
+ verifyMethodResult(SimpleRegionObserver.class,
new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
"hadDeleted"},
- TEST_TABLE,
+ tableName,
new Boolean[] {false, false, false, false, false});
-
- Put put = new Put(ROW);
- put.add(A, A, A);
- put.add(B, B, B);
- put.add(C, C, C);
-
- Delete delete = new Delete(ROW);
- delete.deleteColumn(A, A);
- delete.deleteColumn(B, B);
- delete.deleteColumn(C, C);
-
- RowMutations arm = new RowMutations(ROW);
- arm.add(put);
- arm.add(delete);
- table.mutateRow(arm);
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadDeleted"},
- TEST_TABLE,
- new Boolean[] {false, false, true, true, true}
- );
- util.deleteTable(tableName);
- table.close();
+ Put put = new Put(ROW);
+ put.add(A, A, A);
+ put.add(B, B, B);
+ put.add(C, C, C);
+
+ Delete delete = new Delete(ROW);
+ delete.deleteColumn(A, A);
+ delete.deleteColumn(B, B);
+ delete.deleteColumn(C, C);
+
+ RowMutations arm = new RowMutations(ROW);
+ arm.add(put);
+ arm.add(delete);
+ table.mutateRow(arm);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadDeleted"},
+ tableName,
+ new Boolean[] {false, false, true, true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
}
@Test
public void testIncrementHook() throws IOException {
- TableName tableName = TEST_TABLE;
-
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testIncrementHook");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
- Increment inc = new Increment(Bytes.toBytes(0));
- inc.addColumn(A, A, 1);
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreIncrement", "hadPostIncrement"},
- tableName,
- new Boolean[] {false, false}
- );
-
- table.increment(inc);
+ try {
+ Increment inc = new Increment(Bytes.toBytes(0));
+ inc.addColumn(A, A, 1);
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreIncrement", "hadPostIncrement"},
- tableName,
- new Boolean[] {true, true}
- );
- util.deleteTable(tableName);
- table.close();
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreIncrement", "hadPostIncrement"},
+ tableName,
+ new Boolean[] {false, false}
+ );
+
+ table.increment(inc);
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreIncrement", "hadPostIncrement"},
+ tableName,
+ new Boolean[] {true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
}
@Test
@@ -464,84 +469,82 @@ public class TestRegionObserverInterface
@Test
public void bulkLoadHFileTest() throws Exception {
String testName = TestRegionObserverInterface.class.getName()+".bulkLoadHFileTest";
- TableName tableName = TEST_TABLE;
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".bulkLoadHFileTest");
Configuration conf = util.getConfiguration();
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
- tableName,
- new Boolean[] {false, false}
- );
-
- FileSystem fs = util.getTestFileSystem();
- final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(A));
-
- createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
-
- //Bulk load
- new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
- tableName,
- new Boolean[] {true, true}
- );
- util.deleteTable(tableName);
- table.close();
+ try {
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+ tableName,
+ new Boolean[] {false, false}
+ );
+
+ FileSystem fs = util.getTestFileSystem();
+ final Path dir = util.getDataTestDirOnTestFS(testName).makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(A));
+
+ createHFile(util.getConfiguration(), fs, new Path(familyDir,Bytes.toString(A)), A, A);
+
+ //Bulk load
+ new LoadIncrementalHFiles(conf).doBulkLoad(dir, new HTable(conf, tableName));
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreBulkLoadHFile", "hadPostBulkLoadHFile"},
+ tableName,
+ new Boolean[] {true, true}
+ );
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
}
@Test
public void testRecovery() throws Exception {
- LOG.info(TestRegionObserverInterface.class.getName()+".testRecovery");
- TableName tableName = TEST_TABLE;
-
+ LOG.info(TestRegionObserverInterface.class.getName() +".testRecovery");
+ TableName tableName = TableName.valueOf(TEST_TABLE.getNameAsString() + ".testRecovery");
HTable table = util.createTable(tableName, new byte[][] {A, B, C});
+ try {
+ JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
+ ServerName sn2 = rs1.getRegionServer().getServerName();
+ String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
+
+ util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
+ while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
+ Thread.sleep(100);
+ }
- JVMClusterUtil.RegionServerThread rs1 = cluster.startRegionServer();
- ServerName sn2 = rs1.getRegionServer().getServerName();
- String regEN = table.getRegionLocations().firstEntry().getKey().getEncodedName();
-
- util.getHBaseAdmin().move(regEN.getBytes(), sn2.getServerName().getBytes());
- while (!sn2.equals(table.getRegionLocations().firstEntry().getValue() )){
- Thread.sleep(100);
- }
-
- Put put = new Put(ROW);
- put.add(A, A, A);
- put.add(B, B, B);
- put.add(C, C, C);
- table.put(put);
+ Put put = new Put(ROW);
+ put.add(A, A, A);
+ put.add(B, B, B);
+ put.add(C, C, C);
+ table.put(put);
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
- "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
- TEST_TABLE,
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"hadPreGet", "hadPostGet", "hadPrePut", "hadPostPut",
+ "hadPreBatchMutate", "hadPostBatchMutate", "hadDelete"},
+ tableName,
new Boolean[] {false, false, true, true, true, true, false}
- );
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
- TEST_TABLE,
- new Integer[] {0, 0, 1, 1});
-
- cluster.killRegionServer(rs1.getRegionServer().getServerName());
- Threads.sleep(20000); // just to be sure that the kill has fully started.
- util.waitUntilAllRegionsAssigned(tableName);
+ );
- verifyMethodResult(SimpleRegionObserver.class,
- new String[]{"getCtPreWALRestore", "getCtPostWALRestore"},
- TEST_TABLE,
- new Integer[]{1, 1});
-
- verifyMethodResult(SimpleRegionObserver.class,
- new String[]{"getCtPrePut", "getCtPostPut"},
- TEST_TABLE,
- new Integer[]{0, 0});
-
- util.deleteTable(tableName);
- table.close();
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[] {"getCtPreWALRestore", "getCtPostWALRestore", "getCtPrePut", "getCtPostPut"},
+ tableName,
+ new Integer[] {0, 0, 1, 1});
+
+ cluster.killRegionServer(rs1.getRegionServer().getServerName());
+ Threads.sleep(1000); // Let the kill soak in.
+ util.waitUntilAllRegionsAssigned(tableName);
+ LOG.info("All regions assigned");
+
+ verifyMethodResult(SimpleRegionObserver.class,
+ new String[]{"getCtPrePut", "getCtPostPut"},
+ tableName,
+ new Integer[]{0, 0});
+ } finally {
+ util.deleteTable(tableName);
+ table.close();
+ }
}
@Test
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java?rev=1543458&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBufferChain.java Tue Nov 19 15:39:35 2013
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+@Category(SmallTests.class)
+public class TestBufferChain {
+ private File tmpFile;
+
+ private static final byte[][] HELLO_WORLD_CHUNKS = new byte[][] {
+ "hello".getBytes(Charsets.UTF_8),
+ " ".getBytes(Charsets.UTF_8),
+ "world".getBytes(Charsets.UTF_8)
+ };
+
+ @Before
+ public void setup() throws IOException {
+ tmpFile = File.createTempFile("TestBufferChain", "txt");
+ }
+
+ @After
+ public void teardown() {
+ tmpFile.delete();
+ }
+
+ @Test
+ public void testGetBackBytesWePutIn() {
+ ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+ BufferChain chain = new BufferChain(bufs);
+ assertTrue(Bytes.equals(Bytes.toBytes("hello world"), chain.getBytes()));
+ }
+
+ @Test
+ public void testChainChunkBiggerThanWholeArray() throws IOException {
+ ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+ BufferChain chain = new BufferChain(bufs);
+ writeAndVerify(chain, "hello world", 8192);
+ assertNoRemaining(bufs);
+ }
+
+ @Test
+ public void testChainChunkBiggerThanSomeArrays() throws IOException {
+ ByteBuffer[] bufs = wrapArrays(HELLO_WORLD_CHUNKS);
+ BufferChain chain = new BufferChain(bufs);
+ writeAndVerify(chain, "hello world", 3);
+ assertNoRemaining(bufs);
+ }
+
+ @Test
+ public void testLimitOffset() throws IOException {
+ ByteBuffer[] bufs = new ByteBuffer[] {
+ stringBuf("XXXhelloYYY", 3, 5),
+ stringBuf(" ", 0, 1),
+ stringBuf("XXXXworldY", 4, 5) };
+ BufferChain chain = new BufferChain(bufs);
+ writeAndVerify(chain , "hello world", 3);
+ assertNoRemaining(bufs);
+ }
+
+ @Test
+ public void testWithSpy() throws IOException {
+ ByteBuffer[] bufs = new ByteBuffer[] {
+ stringBuf("XXXhelloYYY", 3, 5),
+ stringBuf(" ", 0, 1),
+ stringBuf("XXXXworldY", 4, 5) };
+ BufferChain chain = new BufferChain(bufs);
+ FileOutputStream fos = new FileOutputStream(tmpFile);
+ FileChannel ch = Mockito.spy(fos.getChannel());
+ try {
+ chain.write(ch, 2);
+ assertEquals("he", Files.toString(tmpFile, Charsets.UTF_8));
+ chain.write(ch, 2);
+ assertEquals("hell", Files.toString(tmpFile, Charsets.UTF_8));
+ chain.write(ch, 3);
+ assertEquals("hello w", Files.toString(tmpFile, Charsets.UTF_8));
+ chain.write(ch, 8);
+ assertEquals("hello world", Files.toString(tmpFile, Charsets.UTF_8));
+ } finally {
+ ch.close();
+ fos.close();
+ }
+ }
+
+ private ByteBuffer stringBuf(String string, int position, int length) {
+ ByteBuffer buf = ByteBuffer.wrap(string.getBytes(Charsets.UTF_8));
+ buf.position(position);
+ buf.limit(position + length);
+ assertTrue(buf.hasRemaining());
+ return buf;
+ }
+
+ private void assertNoRemaining(ByteBuffer[] bufs) {
+ for (ByteBuffer buf : bufs) {
+ assertFalse(buf.hasRemaining());
+ }
+ }
+
+ private ByteBuffer[] wrapArrays(byte[][] arrays) {
+ ByteBuffer[] ret = new ByteBuffer[arrays.length];
+ for (int i = 0; i < arrays.length; i++) {
+ ret[i] = ByteBuffer.wrap(arrays[i]);
+ }
+ return ret;
+ }
+
+ private void writeAndVerify(BufferChain chain, String string, int chunkSize)
+ throws IOException {
+ FileOutputStream fos = new FileOutputStream(tmpFile);
+ FileChannel ch = fos.getChannel();
+ try {
+ long remaining = string.length();
+ while (chain.hasRemaining()) {
+ long n = chain.write(ch, chunkSize);
+ assertTrue(n == chunkSize || n == remaining);
+ remaining -= n;
+ }
+ assertEquals(0, remaining);
+ } finally {
+ fos.close();
+ }
+ assertFalse(chain.hasRemaining());
+ assertEquals(string, Files.toString(tmpFile, Charsets.UTF_8));
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java?rev=1543458&r1=1543457&r2=1543458&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Tue Nov 19 15:39:35 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.CellScann
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.SmallTests;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -76,6 +79,7 @@ import org.mockito.stubbing.Answer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
@@ -89,6 +93,8 @@ public class TestIPC {
public static final Log LOG = LogFactory.getLog(TestIPC.class);
static byte [] CELL_BYTES = Bytes.toBytes("xyz");
static Cell CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
+ static byte [] BIG_CELL_BYTES = new byte [10 * 1024];
+ static Cell BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
private final static Configuration CONF = HBaseConfiguration.create();
// We are using the test TestRpcServiceProtos generated classes and Service because they are
// available and basic with methods like 'echo', and ping. Below we make a blocking service
@@ -303,8 +309,10 @@ public class TestIPC {
int cellcount = Integer.parseInt(args[1]);
Configuration conf = HBaseConfiguration.create();
TestRpcServer rpcServer = new TestRpcServer();
+ MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+ EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);
- KeyValue kv = KeyValueUtil.ensureKeyValue(CELL);
+ KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
Put p = new Put(kv.getRow());
for (int i = 0; i < cellcount; i++) {
p.add(kv);
@@ -324,15 +332,17 @@ public class TestIPC {
RegionAction.newBuilder(),
ClientProtos.Action.newBuilder(),
MutationProto.newBuilder());
- CellScanner cellScanner = CellUtil.createCellScanner(cells);
- if (i % 1000 == 0) {
+ builder.setRegion(RegionSpecifier.newBuilder().setType(RegionSpecifierType.REGION_NAME).
+ setValue(ByteString.copyFrom(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
+ if (i % 100000 == 0) {
LOG.info("" + i);
// Uncomment this for a thread dump every so often.
// ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
// "Thread dump " + Thread.currentThread().getName());
}
+ CellScanner cellScanner = CellUtil.createCellScanner(cells);
Pair<Message, CellScanner> response =
- client.call(null, builder.build(), cellScanner, null, user, address, 0);
+ client.call(md, builder.build(), cellScanner, param, user, address, 0);
/*
int count = 0;
while (p.getSecond().advance()) {