You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/03/20 20:39:51 UTC
svn commit: r1459015 [2/8] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/
hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/
hbase-client/src/test/...
Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ReflectionCache.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.IpcProtocol;
+
+
+import com.google.protobuf.Message;
+
+/**
+ * Save on relection by keeping around method, method argument, and constructor instances
+ */
+class ReflectionCache {
+ private final Map<String, Message> methodArgCache = new ConcurrentHashMap<String, Message>();
+ private final Map<String, Method> methodInstanceCache = new ConcurrentHashMap<String, Method>();
+
+ public ReflectionCache() {
+ super();
+ }
+
+ Method getMethod(Class<? extends IpcProtocol> protocol, String methodName) {
+ Method method = this.methodInstanceCache.get(methodName);
+ if (method != null) return method;
+ Method [] methods = protocol.getMethods();
+ for (Method m : methods) {
+ if (m.getName().equals(methodName)) {
+ m.setAccessible(true);
+ this.methodInstanceCache.put(methodName, m);
+ return m;
+ }
+ }
+ return null;
+ }
+
+ Message getMethodArgType(Method method) throws Exception {
+ Message protoType = this.methodArgCache.get(method.getName());
+ if (protoType != null) return protoType;
+ Class<?>[] args = method.getParameterTypes();
+ Class<?> arg;
+ if (args.length == 2) {
+ // RpcController + Message in the method args
+ // (generated code from RPC bits in .proto files have RpcController)
+ arg = args[1];
+ } else if (args.length == 1) {
+ arg = args[0];
+ } else {
+ //unexpected
+ return null;
+ }
+ //in the protobuf methods, args[1] is the only significant argument
+ Method newInstMethod = arg.getMethod("getDefaultInstance");
+ newInstMethod.setAccessible(true);
+ protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
+ this.methodArgCache.put(method.getName(), protoType);
+ return protoType;
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RemoteWithExtrasException.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * An {@link RemoteException} with some extra information. If source exception
+ * was a {@link DoNotRetryIOException}, {@link #isDoNotRetry()} will return true.
+ */
+@SuppressWarnings("serial")
+@InterfaceAudience.Private
+public class RemoteWithExtrasException extends RemoteException {
+ private final String hostname;
+ private final int port;
+ private final boolean doNotRetry;
+
+ public RemoteWithExtrasException(String className, String msg, final boolean doNotRetry) {
+ this(className, msg, null, -1, doNotRetry);
+ }
+
+ public RemoteWithExtrasException(String className, String msg, final String hostname,
+ final int port, final boolean doNotRetry) {
+ super(className, msg);
+ this.hostname = hostname;
+ this.port = port;
+ this.doNotRetry = doNotRetry;
+ }
+
+ /**
+ * @return null if not set
+ */
+ public String getHostname() {
+ return this.hostname;
+ }
+
+ /**
+ * @return -1 if not set
+ */
+ public int getPort() {
+ return this.port;
+ }
+
+ /**
+ * @return True if origin exception was a do not retry type.
+ */
+ public boolean isDoNotRetry() {
+ return this.doNotRetry;
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Wed Mar 20 19:39:50 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.protobuf;
+
import static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
import java.io.ByteArrayOutputStream;
@@ -34,7 +35,21 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableSet;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcChannel;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import com.google.protobuf.TextFormat;
+
+
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -42,20 +57,15 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MasterAdminProtocol;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.MultiAction;
-import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@@ -88,12 +98,11 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.DeleteType;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.DeleteType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
@@ -114,16 +123,6 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcChannel;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
/**
* Protobufs utility.
*/
@@ -342,43 +341,75 @@ public final class ProtobufUtil {
}
/**
- * Convert a protocol buffer Mutate to a Put
+ * Convert a protocol buffer Mutate to a Put.
*
- * @param proto the protocol buffer Mutate to convert
- * @return the converted client Put
- * @throws DoNotRetryIOException
+ * @param proto The protocol buffer MutationProto to convert
+ * @return A client Put.
+ * @throws IOException
*/
- public static Put toPut(
- final Mutate proto) throws DoNotRetryIOException {
- MutateType type = proto.getMutateType();
- assert type == MutateType.PUT : type.name();
- byte[] row = proto.getRow().toByteArray();
- long timestamp = HConstants.LATEST_TIMESTAMP;
- if (proto.hasTimestamp()) {
- timestamp = proto.getTimestamp();
- }
- Put put = new Put(row, timestamp);
- put.setWriteToWAL(proto.getWriteToWAL());
- for (NameBytesPair attribute: proto.getAttributeList()) {
- put.setAttribute(attribute.getName(),
- attribute.getValue().toByteArray());
- }
- for (ColumnValue column: proto.getColumnValueList()) {
- byte[] family = column.getFamily().toByteArray();
- for (QualifierValue qv: column.getQualifierValueList()) {
- byte[] qualifier = qv.getQualifier().toByteArray();
- if (!qv.hasValue()) {
- throw new DoNotRetryIOException(
- "Missing required field: qualifer value");
+ public static Put toPut(final MutationProto proto)
+ throws IOException {
+ return toPut(proto, null);
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to a Put.
+ *
+ * @param proto The protocol buffer MutationProto to convert
+ * @param cellScanner If non-null, the Cell data that goes with this proto.
+ * @return A client Put.
+ * @throws IOException
+ */
+ public static Put toPut(final MutationProto proto, final CellScanner cellScanner)
+ throws IOException {
+ // TODO: Server-side at least why do we convert back to the Client types? Why not just pb it?
+ MutationType type = proto.getMutateType();
+ assert type == MutationType.PUT: type.name();
+ byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+ long timestamp = proto.hasTimestamp()? proto.getTimestamp(): HConstants.LATEST_TIMESTAMP;
+ Put put = null;
+ int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+ if (cellCount > 0) {
+ // The proto has metadata only and the data is separate to be found in the cellScanner.
+ if (cellScanner == null) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+ TextFormat.shortDebugString(proto));
+ }
+ for (int i = 0; i < cellCount; i++) {
+ if (!cellScanner.advance()) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+ " no cell returned: " + TextFormat.shortDebugString(proto));
+ }
+ Cell cell = cellScanner.current();
+ if (put == null) {
+ put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), timestamp);
}
- byte[] value = qv.getValue().toByteArray();
- long ts = timestamp;
- if (qv.hasTimestamp()) {
- ts = qv.getTimestamp();
+ put.add(KeyValueUtil.ensureKeyValue(cell));
+ }
+ } else {
+ put = new Put(row, timestamp);
+ // The proto has the metadata and the data itself
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException(
+ "Missing required field: qualifer value");
+ }
+ byte[] value = qv.getValue().toByteArray();
+ long ts = timestamp;
+ if (qv.hasTimestamp()) {
+ ts = qv.getTimestamp();
+ }
+ put.add(family, qualifier, ts, value);
}
- put.add(family, qualifier, ts, value);
}
}
+ put.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ put.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+ }
return put;
}
@@ -387,74 +418,130 @@ public final class ProtobufUtil {
*
* @param proto the protocol buffer Mutate to convert
* @return the converted client Delete
+ * @throws IOException
*/
- public static Delete toDelete(final Mutate proto) {
- MutateType type = proto.getMutateType();
- assert type == MutateType.DELETE : type.name();
- byte[] row = proto.getRow().toByteArray();
+ public static Delete toDelete(final MutationProto proto)
+ throws IOException {
+ return toDelete(proto, null);
+ }
+
+ /**
+ * Convert a protocol buffer Mutate to a Delete
+ *
+ * @param proto the protocol buffer Mutate to convert
+ * @param cellScanner if non-null, the data that goes with this delete.
+ * @return the converted client Delete
+ * @throws IOException
+ */
+ public static Delete toDelete(final MutationProto proto, final CellScanner cellScanner)
+ throws IOException {
+ MutationType type = proto.getMutateType();
+ assert type == MutationType.DELETE : type.name();
+ byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
long timestamp = HConstants.LATEST_TIMESTAMP;
if (proto.hasTimestamp()) {
timestamp = proto.getTimestamp();
}
- Delete delete = new Delete(row, timestamp);
- delete.setWriteToWAL(proto.getWriteToWAL());
- for (NameBytesPair attribute: proto.getAttributeList()) {
- delete.setAttribute(attribute.getName(),
- attribute.getValue().toByteArray());
- }
- for (ColumnValue column: proto.getColumnValueList()) {
- byte[] family = column.getFamily().toByteArray();
- for (QualifierValue qv: column.getQualifierValueList()) {
- DeleteType deleteType = qv.getDeleteType();
- byte[] qualifier = null;
- if (qv.hasQualifier()) {
- qualifier = qv.getQualifier().toByteArray();
+ Delete delete = null;
+ int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+ if (cellCount > 0) {
+ // The proto has metadata only and the data is separate to be found in the cellScanner.
+ if (cellScanner == null) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+ TextFormat.shortDebugString(proto));
+ }
+ for (int i = 0; i < cellCount; i++) {
+ if (!cellScanner.advance()) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+ " no cell returned: " + TextFormat.shortDebugString(proto));
}
- long ts = HConstants.LATEST_TIMESTAMP;
- if (qv.hasTimestamp()) {
- ts = qv.getTimestamp();
+ Cell cell = cellScanner.current();
+ if (delete == null) {
+ delete =
+ new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), timestamp);
}
- if (deleteType == DeleteType.DELETE_ONE_VERSION) {
- delete.deleteColumn(family, qualifier, ts);
- } else if (deleteType == DeleteType.DELETE_MULTIPLE_VERSIONS) {
- delete.deleteColumns(family, qualifier, ts);
- } else {
- delete.deleteFamily(family, ts);
+ delete.addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
+ }
+ } else {
+ delete = new Delete(row, timestamp);
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ DeleteType deleteType = qv.getDeleteType();
+ byte[] qualifier = null;
+ if (qv.hasQualifier()) {
+ qualifier = qv.getQualifier().toByteArray();
+ }
+ long ts = HConstants.LATEST_TIMESTAMP;
+ if (qv.hasTimestamp()) {
+ ts = qv.getTimestamp();
+ }
+ if (deleteType == DeleteType.DELETE_ONE_VERSION) {
+ delete.deleteColumn(family, qualifier, ts);
+ } else if (deleteType == DeleteType.DELETE_MULTIPLE_VERSIONS) {
+ delete.deleteColumns(family, qualifier, ts);
+ } else {
+ delete.deleteFamily(family, ts);
+ }
}
}
}
+ delete.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ delete.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
+ }
return delete;
}
/**
* Convert a protocol buffer Mutate to an Append
- *
+ * @param cellScanner
* @param proto the protocol buffer Mutate to convert
* @return the converted client Append
* @throws DoNotRetryIOException
*/
- public static Append toAppend(
- final Mutate proto) throws DoNotRetryIOException {
- MutateType type = proto.getMutateType();
- assert type == MutateType.APPEND : type.name();
- byte[] row = proto.getRow().toByteArray();
- Append append = new Append(row);
- append.setWriteToWAL(proto.getWriteToWAL());
- for (NameBytesPair attribute: proto.getAttributeList()) {
- append.setAttribute(attribute.getName(),
- attribute.getValue().toByteArray());
- }
- for (ColumnValue column: proto.getColumnValueList()) {
- byte[] family = column.getFamily().toByteArray();
- for (QualifierValue qv: column.getQualifierValueList()) {
- byte[] qualifier = qv.getQualifier().toByteArray();
- if (!qv.hasValue()) {
- throw new DoNotRetryIOException(
- "Missing required field: qualifer value");
+ public static Append toAppend(final MutationProto proto, final CellScanner cellScanner)
+ throws DoNotRetryIOException {
+ MutationType type = proto.getMutateType();
+ assert type == MutationType.APPEND : type.name();
+ byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+ Append append = null;
+ int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+ if (cellCount > 0) {
+ // The proto has metadata only and the data is separate to be found in the cellScanner.
+ if (cellScanner == null) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+ TextFormat.shortDebugString(proto));
+ }
+ for (int i = 0; i < cellCount; i++) {
+ if (!cellScanner.advance()) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+ " no cell returned: " + TextFormat.shortDebugString(proto));
+ }
+ Cell cell = cellScanner.current();
+ if (append == null) {
+ append = new Append(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
- byte[] value = qv.getValue().toByteArray();
- append.add(family, qualifier, value);
+ append.add(KeyValueUtil.ensureKeyValue(cell));
}
+ } else {
+ append = new Append(row);
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException(
+ "Missing required field: qualifer value");
+ }
+ byte[] value = qv.getValue().toByteArray();
+ append.add(family, qualifier, value);
+ }
+ }
+ }
+ append.setWriteToWAL(proto.getWriteToWAL());
+ for (NameBytesPair attribute: proto.getAttributeList()) {
+ append.setAttribute(attribute.getName(), attribute.getValue().toByteArray());
}
return append;
}
@@ -466,18 +553,18 @@ public final class ProtobufUtil {
* @return the converted Mutation
* @throws IOException
*/
- public static Mutation toMutation(final Mutate proto) throws IOException {
- MutateType type = proto.getMutateType();
- if (type == MutateType.APPEND) {
- return toAppend(proto);
+ public static Mutation toMutation(final MutationProto proto) throws IOException {
+ MutationType type = proto.getMutateType();
+ if (type == MutationType.APPEND) {
+ return toAppend(proto, null);
}
- if (type == MutateType.DELETE) {
- return toDelete(proto);
+ if (type == MutationType.DELETE) {
+ return toDelete(proto, null);
}
- if (type == MutateType.PUT) {
- return toPut(proto);
+ if (type == MutationType.PUT) {
+ return toPut(proto, null);
}
- throw new IOException("Not an understood mutate type " + type);
+ throw new IOException("Unknown mutation type " + type);
}
/**
@@ -487,13 +574,44 @@ public final class ProtobufUtil {
* @return the converted client Increment
* @throws IOException
*/
- public static Increment toIncrement(
- final Mutate proto) throws IOException {
- MutateType type = proto.getMutateType();
- assert type == MutateType.INCREMENT : type.name();
- byte[] row = proto.getRow().toByteArray();
- Increment increment = new Increment(row);
- increment.setWriteToWAL(proto.getWriteToWAL());
+ public static Increment toIncrement(final MutationProto proto, final CellScanner cellScanner)
+ throws IOException {
+ MutationType type = proto.getMutateType();
+ assert type == MutationType.INCREMENT : type.name();
+ byte [] row = proto.hasRow()? proto.getRow().toByteArray(): null;
+ Increment increment = null;
+ int cellCount = proto.hasAssociatedCellCount()? proto.getAssociatedCellCount(): 0;
+ if (cellCount > 0) {
+ // The proto has metadata only and the data is separate to be found in the cellScanner.
+ if (cellScanner == null) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but no cellScanner: " +
+ TextFormat.shortDebugString(proto));
+ }
+ for (int i = 0; i < cellCount; i++) {
+ if (!cellScanner.advance()) {
+ throw new DoNotRetryIOException("Cell count of " + cellCount + " but at index " + i +
+ " no cell returned: " + TextFormat.shortDebugString(proto));
+ }
+ Cell cell = cellScanner.current();
+ if (increment == null) {
+ increment = new Increment(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+ }
+ increment.add(KeyValueUtil.ensureKeyValue(cell));
+ }
+ } else {
+ increment = new Increment(row);
+ for (ColumnValue column: proto.getColumnValueList()) {
+ byte[] family = column.getFamily().toByteArray();
+ for (QualifierValue qv: column.getQualifierValueList()) {
+ byte[] qualifier = qv.getQualifier().toByteArray();
+ if (!qv.hasValue()) {
+ throw new DoNotRetryIOException("Missing required field: qualifer value");
+ }
+ long value = Bytes.toLong(qv.getValue().toByteArray());
+ increment.addColumn(family, qualifier, value);
+ }
+ }
+ }
if (proto.hasTimeRange()) {
HBaseProtos.TimeRange timeRange = proto.getTimeRange();
long minStamp = 0;
@@ -506,18 +624,7 @@ public final class ProtobufUtil {
}
increment.setTimeRange(minStamp, maxStamp);
}
- for (ColumnValue column: proto.getColumnValueList()) {
- byte[] family = column.getFamily().toByteArray();
- for (QualifierValue qv: column.getQualifierValueList()) {
- byte[] qualifier = qv.getQualifier().toByteArray();
- if (!qv.hasValue()) {
- throw new DoNotRetryIOException(
- "Missing required field: qualifer value");
- }
- long value = Bytes.toLong(qv.getValue().toByteArray());
- increment.addColumn(family, qualifier, value);
- }
- }
+ increment.setWriteToWAL(proto.getWriteToWAL());
return increment;
}
@@ -733,10 +840,10 @@ public final class ProtobufUtil {
* @param increment
* @return the converted mutate
*/
- public static Mutate toMutate(final Increment increment) {
- Mutate.Builder builder = Mutate.newBuilder();
+ public static MutationProto toMutation(final Increment increment) {
+ MutationProto.Builder builder = MutationProto.newBuilder();
builder.setRow(ByteString.copyFrom(increment.getRow()));
- builder.setMutateType(MutateType.INCREMENT);
+ builder.setMutateType(MutationType.INCREMENT);
builder.setWriteToWAL(increment.getWriteToWAL());
TimeRange timeRange = increment.getTimeRange();
if (!timeRange.isAllTime()) {
@@ -768,27 +875,14 @@ public final class ProtobufUtil {
/**
* Create a protocol buffer Mutate based on a client Mutation
*
- * @param mutateType
+ * @param type
* @param mutation
- * @return a mutate
+ * @return a protobuf'd Mutation
* @throws IOException
*/
- public static Mutate toMutate(final MutateType mutateType,
- final Mutation mutation) throws IOException {
- Mutate.Builder mutateBuilder = Mutate.newBuilder();
- mutateBuilder.setRow(ByteString.copyFrom(mutation.getRow()));
- mutateBuilder.setMutateType(mutateType);
- mutateBuilder.setWriteToWAL(mutation.getWriteToWAL());
- mutateBuilder.setTimestamp(mutation.getTimeStamp());
- Map<String, byte[]> attributes = mutation.getAttributesMap();
- if (!attributes.isEmpty()) {
- NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
- for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
- attributeBuilder.setName(attribute.getKey());
- attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
- mutateBuilder.addAttribute(attributeBuilder.build());
- }
- }
+ public static MutationProto toMutation(final MutationType type, final Mutation mutation)
+ throws IOException {
+ MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
for (Map.Entry<byte[],List<? extends Cell>> family: mutation.getFamilyMap().entrySet()) {
@@ -799,15 +893,56 @@ public final class ProtobufUtil {
valueBuilder.setQualifier(ByteString.copyFrom(kv.getQualifier()));
valueBuilder.setValue(ByteString.copyFrom(kv.getValue()));
valueBuilder.setTimestamp(kv.getTimestamp());
- if (mutateType == MutateType.DELETE) {
+ if (type == MutationType.DELETE) {
KeyValue.Type keyValueType = KeyValue.Type.codeToType(kv.getType());
valueBuilder.setDeleteType(toDeleteType(keyValueType));
}
columnBuilder.addQualifierValue(valueBuilder.build());
}
- mutateBuilder.addColumnValue(columnBuilder.build());
+ builder.addColumnValue(columnBuilder.build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MutationProto based on a client Mutation. Does NOT include data.
+ * Understanding is that the Cell will be transported other than via protobuf.
+ * @param type
+ * @param mutation
+ * @return a protobuf'd Mutation
+ * @throws IOException
+ */
+ public static MutationProto toMutationNoData(final MutationType type, final Mutation mutation)
+ throws IOException {
+ MutationProto.Builder builder = getMutationBuilderAndSetCommonFields(type, mutation);
+ builder.setAssociatedCellCount(mutation.size());
+ return builder.build();
+ }
+
+ /**
+ * Code shared by {@link #toMutation(MutationType, Mutation)} and
+ * {@link #toMutationNoData(MutationType, Mutation)}
+ * @param type
+ * @param mutation
+ * @return A partly-filled out protobuf'd Mutation.
+ */
+ private static MutationProto.Builder getMutationBuilderAndSetCommonFields(final MutationType type,
+ final Mutation mutation) {
+ MutationProto.Builder builder = MutationProto.newBuilder();
+ builder.setRow(ByteString.copyFrom(mutation.getRow()));
+ builder.setMutateType(type);
+ builder.setWriteToWAL(mutation.getWriteToWAL());
+ builder.setTimestamp(mutation.getTimeStamp());
+ Map<String, byte[]> attributes = mutation.getAttributesMap();
+ if (!attributes.isEmpty()) {
+ NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
+ for (Map.Entry<String, byte[]> attribute: attributes.entrySet()) {
+ attributeBuilder.setName(attribute.getKey());
+ attributeBuilder.setValue(ByteString.copyFrom(attribute.getValue()));
+ builder.addAttribute(attributeBuilder.build());
+ }
}
- return mutateBuilder.build();
+ return builder;
}
/**
@@ -821,25 +956,66 @@ public final class ProtobufUtil {
Cell [] cells = result.raw();
if (cells != null) {
for (Cell c : cells) {
- builder.addKeyValue(toKeyValue(c));
+ builder.addCell(toCell(c));
}
}
return builder.build();
}
/**
+ * Convert a client Result to a protocol buffer Result.
+ * The pb Result does not include the Cell data. That is for transport otherwise.
+ *
+ * @param result the client Result to convert
+ * @return the converted protocol buffer Result
+ */
+ public static ClientProtos.Result toResultNoData(final Result result) {
+ ClientProtos.Result.Builder builder = ClientProtos.Result.newBuilder();
+ builder.setAssociatedCellCount(result.size());
+ return builder.build();
+ }
+
+ /**
* Convert a protocol buffer Result to a client Result
*
* @param proto the protocol buffer Result to convert
* @return the converted client Result
*/
public static Result toResult(final ClientProtos.Result proto) {
- List<HBaseProtos.KeyValue> values = proto.getKeyValueList();
- List<KeyValue> keyValues = new ArrayList<KeyValue>(values.size());
- for (HBaseProtos.KeyValue kv: values) {
- keyValues.add(toKeyValue(kv));
+ List<HBaseProtos.Cell> values = proto.getCellList();
+ List<Cell> cells = new ArrayList<Cell>(values.size());
+ for (HBaseProtos.Cell c: values) {
+ cells.add(toCell(c));
+ }
+ return new Result(cells);
+ }
+
+ /**
+ * Convert a protocol buffer Result to a client Result
+ *
+ * @param proto the protocol buffer Result to convert
+ * @param scanner Optional cell scanner.
+ * @return the converted client Result
+ * @throws IOException
+ */
+ public static Result toResult(final ClientProtos.Result proto, final CellScanner scanner)
+ throws IOException {
+ // TODO: Unit test that has some Cells in scanner and some in the proto.
+ List<Cell> cells = null;
+ if (proto.hasAssociatedCellCount()) {
+ int count = proto.getAssociatedCellCount();
+ cells = new ArrayList<Cell>(count);
+ for (int i = 0; i < count; i++) {
+ if (!scanner.advance()) throw new IOException("Failed get " + i + " of " + count);
+ cells.add(scanner.current());
+ }
+ }
+ List<HBaseProtos.Cell> values = proto.getCellList();
+ if (cells == null) cells = new ArrayList<Cell>(values.size());
+ for (HBaseProtos.Cell c: values) {
+ cells.add(toCell(c));
}
- return new Result(keyValues);
+ return new Result(cells);
}
/**
@@ -1012,55 +1188,6 @@ public final class ProtobufUtil {
}
/**
- * A helper to invoke a multi action using client protocol.
- *
- * @param client
- * @param multi
- * @return a multi response
- * @throws IOException
- */
- public static <R> MultiResponse multi(final ClientProtocol client,
- final MultiAction<R> multi) throws IOException {
- MultiResponse response = new MultiResponse();
- for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
- byte[] regionName = e.getKey();
- int rowMutations = 0;
- List<Action<R>> actions = e.getValue();
- for (Action<R> action: actions) {
- Row row = action.getAction();
- if (row instanceof RowMutations) {
- try {
- MultiRequest request =
- RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
- client.multi(null, request);
- response.add(regionName, action.getOriginalIndex(), new Result());
- } catch (ServiceException se) {
- response.add(regionName, action.getOriginalIndex(), getRemoteException(se));
- }
- rowMutations++;
- }
- }
- if (actions.size() > rowMutations) {
- Exception ex = null;
- List<Object> results = null;
- try {
- MultiRequest request =
- RequestConverter.buildMultiRequest(regionName, actions);
- ClientProtos.MultiResponse proto = client.multi(null, request);
- results = ResponseConverter.getResults(proto);
- } catch (ServiceException se) {
- ex = getRemoteException(se);
- }
- for (int i = 0, n = actions.size(); i < n; i++) {
- int originalIndex = actions.get(i).getOriginalIndex();
- response.add(regionName, originalIndex, results == null ? ex : results.get(i));
- }
- }
- }
- return response;
- }
-
- /**
* A helper to bulk load a list of HFiles using client protocol.
*
* @param client
@@ -1731,33 +1858,31 @@ public final class ProtobufUtil {
throw new IOException(se);
}
- public static HBaseProtos.KeyValue toKeyValue(final Cell kv) {
+ public static HBaseProtos.Cell toCell(final Cell kv) {
// Doing this is going to kill us if we do it for all data passed.
// St.Ack 20121205
- // TODO: Do a Cell version
- HBaseProtos.KeyValue.Builder kvbuilder = HBaseProtos.KeyValue.newBuilder();
+ HBaseProtos.Cell.Builder kvbuilder = HBaseProtos.Cell.newBuilder();
kvbuilder.setRow(ByteString.copyFrom(kv.getRowArray(), kv.getRowOffset(),
kv.getRowLength()));
kvbuilder.setFamily(ByteString.copyFrom(kv.getFamilyArray(),
kv.getFamilyOffset(), kv.getFamilyLength()));
kvbuilder.setQualifier(ByteString.copyFrom(kv.getQualifierArray(),
kv.getQualifierOffset(), kv.getQualifierLength()));
- kvbuilder.setKeyType(HBaseProtos.KeyType.valueOf(kv.getTypeByte()));
+ kvbuilder.setCellType(HBaseProtos.CellType.valueOf(kv.getTypeByte()));
kvbuilder.setTimestamp(kv.getTimestamp());
kvbuilder.setValue(ByteString.copyFrom(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
return kvbuilder.build();
}
- public static KeyValue toKeyValue(final HBaseProtos.KeyValue kv) {
+ public static Cell toCell(final HBaseProtos.Cell cell) {
// Doing this is going to kill us if we do it for all data passed.
// St.Ack 20121205
- // TODO: Do a Cell version
- return new KeyValue(kv.getRow().toByteArray(),
- kv.getFamily().toByteArray(),
- kv.getQualifier().toByteArray(),
- kv.getTimestamp(),
- KeyValue.Type.codeToType((byte)kv.getKeyType().getNumber()),
- kv.getValue().toByteArray());
+ return CellUtil.createCell(cell.getRow().toByteArray(),
+ cell.getFamily().toByteArray(),
+ cell.getQualifier().toByteArray(),
+ cell.getTimestamp(),
+ (byte)cell.getCellType().getNumber(),
+ cell.getValue().toByteArray());
}
/**
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Wed Mar 20 19:39:50 2013
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.protobuf;
-import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -58,11 +61,11 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.MutateType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
@@ -92,8 +95,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import java.io.IOException;
-import java.util.List;
+import com.google.protobuf.ByteString;
/**
* Helper utility to build protocol buffer requests,
@@ -206,9 +208,9 @@ public final class RequestConverter {
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- Mutate.Builder mutateBuilder = Mutate.newBuilder();
+ MutationProto.Builder mutateBuilder = MutationProto.newBuilder();
mutateBuilder.setRow(ByteString.copyFrom(row));
- mutateBuilder.setMutateType(MutateType.INCREMENT);
+ mutateBuilder.setMutateType(MutationType.INCREMENT);
mutateBuilder.setWriteToWAL(writeToWAL);
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
columnBuilder.setFamily(ByteString.copyFrom(family));
@@ -217,8 +219,7 @@ public final class RequestConverter {
valueBuilder.setQualifier(ByteString.copyFrom(qualifier));
columnBuilder.addQualifierValue(valueBuilder.build());
mutateBuilder.addColumnValue(columnBuilder.build());
-
- builder.setMutate(mutateBuilder.build());
+ builder.setMutation(mutateBuilder.build());
return builder.build();
}
@@ -245,7 +246,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
- builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
builder.setCondition(condition);
return builder.build();
}
@@ -273,7 +274,7 @@ public final class RequestConverter {
builder.setRegion(region);
Condition condition = buildCondition(
row, family, qualifier, comparator, compareType);
- builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
builder.setCondition(condition);
return builder.build();
}
@@ -292,7 +293,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutate(ProtobufUtil.toMutate(MutateType.PUT, put));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put));
return builder.build();
}
@@ -310,7 +311,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, append));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, append));
return builder.build();
}
@@ -327,7 +328,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutate(ProtobufUtil.toMutate(increment));
+ builder.setMutation(ProtobufUtil.toMutation(increment));
return builder.build();
}
@@ -345,7 +346,7 @@ public final class RequestConverter {
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
- builder.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, delete));
+ builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete));
return builder.build();
}
@@ -358,29 +359,64 @@ public final class RequestConverter {
* @throws IOException
*/
public static MultiRequest buildMultiRequest(final byte[] regionName,
- final RowMutations rowMutations) throws IOException {
- MultiRequest.Builder builder = MultiRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- builder.setAtomic(true);
+ final RowMutations rowMutations)
+ throws IOException {
+ MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
for (Mutation mutation: rowMutations.getMutations()) {
- MutateType mutateType = null;
+ MutationType mutateType = null;
if (mutation instanceof Put) {
- mutateType = MutateType.PUT;
+ mutateType = MutationType.PUT;
} else if (mutation instanceof Delete) {
- mutateType = MutateType.DELETE;
+ mutateType = MutationType.DELETE;
} else {
- throw new DoNotRetryIOException(
- "RowMutations supports only put and delete, not "
- + mutation.getClass().getName());
+ throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
+ mutation.getClass().getName());
+ }
+ MutationProto mp = ProtobufUtil.toMutation(mutateType, mutation);
+ builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Create a protocol buffer MultiRequest for row mutations that does not hold data. Data/Cells
+ * are carried outside of protobuf. Return references to the Cells in <code>cells</code> param
+ *
+ * @param regionName
+ * @param rowMutations
+ * @param cells Return in here a list of Cells as CellIterable.
+ * @return a multi request minus data
+ * @throws IOException
+ */
+ public static MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+ final RowMutations rowMutations, final List<CellScannable> cells)
+ throws IOException {
+ MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, true);
+ for (Mutation mutation: rowMutations.getMutations()) {
+ MutationType type = null;
+ if (mutation instanceof Put) {
+ type = MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ type = MutationType.DELETE;
+ } else {
+ throw new DoNotRetryIOException("RowMutations supports only put and delete, not " +
+ mutation.getClass().getName());
}
- Mutate mutate = ProtobufUtil.toMutate(mutateType, mutation);
- builder.addAction(MultiAction.newBuilder().setMutate(mutate).build());
+ MutationProto mp = ProtobufUtil.toMutationNoData(type, mutation);
+ cells.add(mutation);
+ builder.addAction(MultiAction.newBuilder().setMutation(mp).build());
}
return builder.build();
}
+ private static MultiRequest.Builder getMultiRequestBuilderWithRegionAndAtomicSet(final byte [] regionName,
+ final boolean atomic) {
+ MultiRequest.Builder builder = MultiRequest.newBuilder();
+ RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
+ builder.setRegion(region);
+ return builder.setAtomic(atomic);
+ }
+
/**
* Create a protocol buffer ScanRequest for a client Scan
*
@@ -475,25 +511,22 @@ public final class RequestConverter {
* @throws IOException
*/
public static <R> MultiRequest buildMultiRequest(final byte[] regionName,
- final List<Action<R>> actions) throws IOException {
- MultiRequest.Builder builder = MultiRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
+ final List<Action<R>> actions)
+ throws IOException {
+ MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
for (Action<R> action: actions) {
MultiAction.Builder protoAction = MultiAction.newBuilder();
-
Row row = action.getAction();
if (row instanceof Get) {
protoAction.setGet(ProtobufUtil.toGet((Get)row));
} else if (row instanceof Put) {
- protoAction.setMutate(ProtobufUtil.toMutate(MutateType.PUT, (Put)row));
+ protoAction.setMutation(ProtobufUtil.toMutation(MutationType.PUT, (Put)row));
} else if (row instanceof Delete) {
- protoAction.setMutate(ProtobufUtil.toMutate(MutateType.DELETE, (Delete)row));
+ protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, (Delete)row));
} else if (row instanceof Append) {
- protoAction.setMutate(ProtobufUtil.toMutate(MutateType.APPEND, (Append)row));
+ protoAction.setMutation(ProtobufUtil.toMutation(MutationType.APPEND, (Append)row));
} else if (row instanceof Increment) {
- protoAction.setMutate(ProtobufUtil.toMutate((Increment)row));
+ protoAction.setMutation(ProtobufUtil.toMutation((Increment)row));
} else if (row instanceof RowMutations) {
continue; // ignore RowMutations
} else {
@@ -505,6 +538,68 @@ public final class RequestConverter {
return builder.build();
}
+ /**
+ * Create a protocol buffer multirequest with NO data for a list of actions (data is carried
+ * otherwise than via protobuf). This means it just notes attributes, whether to write the
+ * WAL, etc., and the presence in protobuf serves as place holder for the data which is
+ * coming along otherwise. Note that Get is different. It does not contain 'data' and is always
+ * carried by protobuf. We return references to the data by adding them to the passed in
+ * <code>data</code> param.
+ *
+ * RowMutations in the list (if any) will be ignored.
+ *
+ * @param regionName
+ * @param actions
+ * @param cells Place to stuff references to actual data.
+ * @return a multi request that does not carry any data.
+ * @throws IOException
+ */
+ public static <R> MultiRequest buildNoDataMultiRequest(final byte[] regionName,
+ final List<Action<R>> actions, final List<CellScannable> cells)
+ throws IOException {
+ MultiRequest.Builder builder = getMultiRequestBuilderWithRegionAndAtomicSet(regionName, false);
+ for (Action<R> action: actions) {
+ MultiAction.Builder protoAction = MultiAction.newBuilder();
+ Row row = action.getAction();
+ if (row instanceof Get) {
+ // Gets are carried by protobufs.
+ protoAction.setGet(ProtobufUtil.toGet((Get)row));
+ } else if (row instanceof Put) {
+ Put p = (Put)row;
+ cells.add(p);
+ protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.PUT, p));
+ } else if (row instanceof Delete) {
+ Delete d = (Delete)row;
+ int size = d.size();
+ // Note that a legitimate Delete may have a size of zero; i.e. a Delete that has nothing
+ // in it but the row to delete. In this case, the current implementation does not make
+ // a KeyValue to represent a delete-of-all-the-row until we serialize... For such cases
+ // where the size returned is zero, we will send the Delete fully pb'd rather than have
+ // metadata only in the pb and then send the kv along the side in cells.
+ if (size > 0) {
+ cells.add(d);
+ protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.DELETE, d));
+ } else {
+ protoAction.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, d));
+ }
+ } else if (row instanceof Append) {
+ Append a = (Append)row;
+ cells.add(a);
+ protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.APPEND, a));
+ } else if (row instanceof Increment) {
+ Increment i = (Increment)row;
+ cells.add(i);
+ protoAction.setMutation(ProtobufUtil.toMutationNoData(MutationType.INCREMENT, i));
+ } else if (row instanceof RowMutations) {
+ continue; // ignore RowMutations
+ } else {
+ throw new DoNotRetryIOException("Multi doesn't support " + row.getClass().getName());
+ }
+ builder.addAction(protoAction.build());
+ }
+ return builder.build();
+ }
+
// End utilities for Client
//Start utilities for Admin
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java Wed Mar 20 19:39:50 2013
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hbase.protobuf;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Result;
@@ -42,9 +45,8 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.security.access.UserPermission;
import org.apache.hadoop.util.StringUtils;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcController;
/**
* Helper utility to build protocol buffer responses,
@@ -78,11 +80,13 @@ public final class ResponseConverter {
* Get the results from a protocol buffer MultiResponse
*
* @param proto the protocol buffer MultiResponse to convert
- * @return the results in the MultiResponse
+ * @param cells Cells to go with the passed in <code>proto</code>. Can be null.
+ * @return the results that were in the MultiResponse (a Result or an Exception).
* @throws IOException
*/
- public static List<Object> getResults(
- final ClientProtos.MultiResponse proto) throws IOException {
+ public static List<Object> getResults(final ClientProtos.MultiResponse proto,
+ final CellScanner cells)
+ throws IOException {
List<Object> results = new ArrayList<Object>();
List<ActionResult> resultList = proto.getResultList();
for (int i = 0, n = resultList.size(); i < n; i++) {
@@ -90,13 +94,8 @@ public final class ResponseConverter {
if (result.hasException()) {
results.add(ProtobufUtil.toException(result.getException()));
} else if (result.hasValue()) {
- ClientProtos.Result r = result.getValue();
- Object value = ProtobufUtil.toResult(r);
- if (value instanceof ClientProtos.Result) {
- results.add(ProtobufUtil.toResult((ClientProtos.Result)value));
- } else {
- results.add(value);
- }
+ ClientProtos.Result value = result.getValue();
+ results.add(ProtobufUtil.toResult(value, cells));
} else {
results.add(new Result());
}
Added: hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java (added)
+++ hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.codec.Codec;
+import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestIPCUtil {
+ IPCUtil util;
+ @Before
+ public void before() {
+ this.util = new IPCUtil(new Configuration());
+ }
+
+ @Test
+ public void testBuildCellBlock() throws IOException {
+ doBuildCellBlockUndoCellBlock(new KeyValueCodec(), null);
+ doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new DefaultCodec());
+ doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new GzipCodec());
+ }
+
+ void doBuildCellBlockUndoCellBlock(final Codec codec, final CompressionCodec compressor)
+ throws IOException {
+ final int count = 10;
+ Cell [] cells = getCells(count);
+ ByteBuffer bb = this.util.buildCellBlock(codec, compressor,
+ CellUtil.createCellScanner(Arrays.asList(cells).iterator()));
+ CellScanner scanner =
+ this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+ int i = 0;
+ while (scanner.advance()) {
+ i++;
+ }
+ assertEquals(count, i);
+ }
+
+ static Cell [] getCells(final int howMany) {
+ Cell [] cells = new Cell[howMany];
+ for (int i = 0; i < howMany; i++) {
+ byte [] index = Bytes.toBytes(i);
+ KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+ cells[i] = kv;
+ }
+ return cells;
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java?rev=1459015&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java (added)
+++ hbase/branches/0.95/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestPayloadCarryingRpcController.java Wed Mar 20 19:39:50 2013
@@ -0,0 +1,156 @@
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScannable;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(SmallTests.class)
+public class TestPayloadCarryingRpcController {
+ @Test
+ public void testListOfCellScannerables() {
+ List<CellScannable> cells = new ArrayList<CellScannable>();
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ cells.add(createCell(i));
+ }
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
+ CellScanner cellScanner = controller.cellScanner();
+ int index = 0;
+ for (; cellScanner.advance(); index++) {
+ Cell cell = cellScanner.current();
+ byte [] indexBytes = Bytes.toBytes(index);
+ assertTrue("" + index, Bytes.equals(indexBytes, 0, indexBytes.length, cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+ assertEquals(count, index);
+ }
+
+ /**
+ * @param index
+ * @return A faked out 'Cell' that does nothing but return index as its value
+ */
+ static CellScannable createCell(final int index) {
+ return new CellScannable() {
+ @Override
+ public CellScanner cellScanner() {
+ return new CellScanner() {
+ @Override
+ public Cell current() {
+ // Fake out a Cell. All this Cell has is a value that is an int in size and equal
+ // to the above 'index' param serialized as an int.
+ return new Cell() {
+ private final int i = index;
+
+ @Override
+ public byte[] getRowArray() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getRowOffset() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public short getRowLength() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte[] getFamilyArray() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getFamilyOffset() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte getFamilyLength() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte[] getQualifierArray() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getQualifierOffset() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getQualifierLength() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long getTimestamp() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte getTypeByte() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public long getMvccVersion() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public byte[] getValueArray() {
+ return Bytes.toBytes(this.i);
+ }
+
+ @Override
+ public int getValueOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getValueLength() {
+ return Bytes.SIZEOF_INT;
+ }
+ };
+ }
+
+ private boolean hasCell = true;
+ @Override
+ public boolean advance() {
+ // We have one Cell only so return true first time then false ever after.
+ if (!hasCell) return hasCell;
+ hasCell = false;
+ return true;
+ }
+ };
+ }
+ };
+ }
+}
Modified: hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1459015&r1=1459014&r2=1459015&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.95/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Wed Mar 20 19:39:50 2013
@@ -55,8 +55,8 @@ public final class HConstants {
/**
* The first four bytes of Hadoop RPC connections
*/
- public static final ByteBuffer RPC_HEADER = ByteBuffer.wrap("hrpc".getBytes());
- public static final byte CURRENT_VERSION = 5;
+ public static final ByteBuffer RPC_HEADER = ByteBuffer.wrap("HBas".getBytes());
+ public static final byte RPC_CURRENT_VERSION = 0;
// HFileBlock constants.