You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/11/22 00:11:56 UTC
svn commit: r1412356 [2/2] - in /hbase/trunk:
hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/
hbase-protocol/src/main/protobuf/
hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/
hbase-server/src/main/java/o...
Added: hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto (added)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto Wed Nov 21 23:11:54 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a protocol to perform multi row transactions.
+ * See BaseRowProcessorEndpoint for the implementation.
+ * See HRegion#processRowsWithLocks() for details.
+ */
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "RowProcessorProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message RowProcessorRequest {
+ required string rowProcessorClassName = 1;
+ optional string rowProcessorInitializerMessageName = 2;
+ optional bytes rowProcessorInitializerMessage = 3;
+}
+
+message RowProcessorResult {
+ required bytes rowProcessorResult = 1;
+}
+
+service RowProcessorService {
+ rpc process (RowProcessorRequest) returns (RowProcessorResult);
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Wed Nov 21 23:11:54 2012
@@ -678,9 +678,10 @@ public class AggregationClient {
final AggregateArgument.Builder requestBuilder =
AggregateArgument.newBuilder();
requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
- if (ci.columnInterpreterSpecificData() != null) {
- requestBuilder.setInterpreterSpecificBytes(
- ci.columnInterpreterSpecificData());
+ ByteString columnInterpreterSpecificData = null;
+ if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData())
+ != null) {
+ requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData);
}
requestBuilder.setScan(ProtobufUtil.toScan(scan));
return requestBuilder.build();
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java Wed Nov 21 23:11:54 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.regionserver.RowProcessor;
+
+import com.google.protobuf.Message;
+/**
+ * Convenience class that is used to make RowProcessorEndpoint invocations.
+ * For example usage, refer TestRowProcessorEndpoint
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RowProcessorClient {
+ public static <S extends Message, T extends Message>
+ RowProcessorRequest getRowProcessorPB(RowProcessor<S,T> r)
+ throws IOException {
+ final RowProcessorRequest.Builder requestBuilder =
+ RowProcessorRequest.newBuilder();
+ requestBuilder.setRowProcessorClassName(r.getClass().getName());
+ S s = r.getRequestData();
+ if (s != null) {
+ requestBuilder.setRowProcessorInitializerMessageName(s.getClass().getName());
+ requestBuilder.setRowProcessorInitializerMessage(s.toByteString());
+ }
+ return requestBuilder.build();
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java Wed Nov 21 23:11:54 2012
@@ -18,21 +18,35 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RowProcessor;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
/**
* This class demonstrates how to implement atomic read-modify-writes
* using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor
- implements RowProcessorProtocol {
-
+public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
+extends RowProcessorService implements CoprocessorService, Coprocessor {
+ private RegionCoprocessorEnvironment env;
/**
* Pass a processor to HRegion to process multiple rows atomically.
*
@@ -42,16 +56,93 @@ public abstract class BaseRowProcessorEn
*
* See {@link TestRowProcessorEndpoint} for example.
*
- * @param processor The object defines the read-modify-write procedure
- * @return The processing result
+ * The request contains information for constructing processor
+ * (see {@link #constructRowProcessorFromRequest}. The processor object defines
+ * the read-modify-write procedure.
*/
@Override
- public <T> T process(RowProcessor<T> processor)
- throws IOException {
- HRegion region =
- ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
- region.processRowsWithLocks(processor);
- return processor.getResult();
+ public void process(RpcController controller, RowProcessorRequest request,
+ RpcCallback<RowProcessorResult> done) {
+ RowProcessorResult resultProto = null;
+ try {
+ RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
+ HRegion region = env.getRegion();
+ region.processRowsWithLocks(processor);
+ T result = processor.getResult();
+ RowProcessorResult.Builder b = RowProcessorResult.newBuilder();
+ b.setRowProcessorResult(result.toByteString());
+ resultProto = b.build();
+ } catch (Exception e) {
+ ResponseConverter.setControllerException(controller, new IOException(e));
+ }
+ done.run(resultProto);
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ /**
+ * Stores a reference to the coprocessor environment provided by the
+ * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+ * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded
+ * on a table region, so always expects this to be an instance of
+ * {@link RegionCoprocessorEnvironment}.
+ * @param env the environment provided by the coprocessor host
+ * @throws IOException if the provided environment is not an instance of
+ * {@code RegionCoprocessorEnvironment}
+ */
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) {
+ this.env = (RegionCoprocessorEnvironment)env;
+ } else {
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // nothing to do
}
+ @SuppressWarnings("unchecked")
+ RowProcessor<S,T> constructRowProcessorFromRequest(RowProcessorRequest request)
+ throws IOException {
+ String className = request.getRowProcessorClassName();
+ Class<?> cls;
+ try {
+ cls = Class.forName(className);
+ RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
+ if (request.hasRowProcessorInitializerMessageName()) {
+ Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
+ .asSubclass(Message.class);
+ Method m;
+ try {
+ m = imn.getMethod("parseFrom", ByteString.class);
+ } catch (SecurityException e) {
+ throw new IOException(e);
+ } catch (NoSuchMethodException e) {
+ throw new IOException(e);
+ }
+ S s;
+ try {
+ s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
+ } catch (IllegalArgumentException e) {
+ throw new IOException(e);
+ } catch (InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ ci.initialize(s);
+ }
+ return ci;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InstantiationException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new IOException(e);
+ }
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Wed Nov 21 23:11:54 2012
@@ -119,7 +119,7 @@ public interface ColumnInterpreter<T, S>
/**
* This method should return any additional data that is needed on the
* server side to construct the ColumnInterpreter. The server
- * will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)}
+ * will pass this to the {@link #initialize(ByteString)}
* method. If there is no ColumnInterpreter specific data (for e.g.,
* {@link LongColumnInterpreter}) then null should be returned.
* @return the PB message
@@ -161,4 +161,4 @@ public interface ColumnInterpreter<T, S>
* @return cast
*/
T castToCellType(S response);
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java Wed Nov 21 23:11:54 2012
@@ -23,15 +23,13 @@ import java.util.UUID;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import com.google.protobuf.Message;
+
/**
* Base class for RowProcessor with some default implementations.
*/
-public abstract class BaseRowProcessor<T> implements RowProcessor<T> {
-
- @Override
- public T getResult() {
- return null;
- }
+public abstract class BaseRowProcessor<S extends Message,T extends Message>
+implements RowProcessor<S,T> {
@Override
public void preProcess(HRegion region, WALEdit walEdit) throws IOException {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Nov 21 23:11:54 2012
@@ -4302,7 +4302,7 @@ public class HRegion implements HeapSize
*
* @param processor The object defines the reads and writes to a row.
*/
- public void processRowsWithLocks(RowProcessor<?> processor)
+ public void processRowsWithLocks(RowProcessor<?,?> processor)
throws IOException {
processRowsWithLocks(processor, rowProcessorTimeout);
}
@@ -4314,7 +4314,7 @@ public class HRegion implements HeapSize
* @param timeout The timeout of the processor.process() execution
* Use a negative number to switch off the time bound
*/
- public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
+ public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
throws IOException {
for (byte[] row : processor.getRowsToLock()) {
@@ -4453,7 +4453,7 @@ public class HRegion implements HeapSize
}
}
- private void doProcessRowWithTimeout(final RowProcessor<?> processor,
+ private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
final long now,
final HRegion region,
final List<KeyValue> mutations,
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java Wed Nov 21 23:11:54 2012
@@ -27,13 +27,16 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
*/
-class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
+class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
+MultiRowMutationProcessorResponse> {
Collection<byte[]> rowsToLock;
Collection<Mutation> mutations;
@@ -52,6 +55,11 @@ class MultiRowMutationProcessor extends
public boolean readOnly() {
return false;
}
+
+ @Override
+ public MultiRowMutationProcessorResponse getResult() {
+ return MultiRowMutationProcessorResponse.getDefaultInstance();
+ }
@Override
public void process(long now,
@@ -123,4 +131,13 @@ class MultiRowMutationProcessor extends
}
}
+ @Override
+ public MultiRowMutationProcessorRequest getRequestData() {
+ return MultiRowMutationProcessorRequest.getDefaultInstance();
+ }
+
+ @Override
+ public void initialize(MultiRowMutationProcessorRequest msg) {
+ //nothing
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Nov 21 23:11:54 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -38,10 +41,12 @@ import org.apache.hadoop.hbase.regionser
* This class performs scans and generates mutations and WAL edits.
* The locks and MVCC will be handled by HRegion.
*
- * The generic type parameter T is the return type of
- * RowProcessor.getResult().
+ * The RowProcessor user code could have data that needs to be
+ * sent across for proper initialization at the server side. The generic type
+ * parameter S is the type of the request data sent to the server.
+ * The generic type parameter T is the return type of RowProcessor.getResult().
*/
-public interface RowProcessor<T> {
+public interface RowProcessor<S extends Message, T extends Message> {
/**
* Rows to lock while operation.
@@ -51,7 +56,9 @@ public interface RowProcessor<T> {
Collection<byte[]> getRowsToLock();
/**
- * Obtain the processing result
+ * Obtain the processing result. All row processor implementations must
+ * implement this, even if the method is simply returning an empty
+ * Message.
*/
T getResult();
@@ -108,4 +115,22 @@ public interface RowProcessor<T> {
* @return The name of the processor
*/
String getName();
-}
+
+ /**
+ * This method should return any additional data that is needed on the
+ * server side to construct the RowProcessor. The server will pass this to
+ * the {@link #initialize(ByteString)} method. If there is no RowProcessor
+ * specific data then null should be returned.
+ * @return the PB message
+ * @throws IOException
+ */
+ S getRequestData() throws IOException;
+
+ /**
+ * This method should initialize any field(s) of the RowProcessor with
+ * a parsing of the passed message bytes (used on the server side).
+ * @param msg
+ * @throws IOException
+ */
+ void initialize(S msg) throws IOException;
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Wed Nov 21 23:11:54 2012
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.coproces
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,27 +36,39 @@ import org.apache.hadoop.hbase.MediumTes
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
import com.sun.org.apache.commons.logging.Log;
import com.sun.org.apache.commons.logging.LogFactory;
@@ -100,7 +110,7 @@ public class TestRowProcessorEndpoint {
@BeforeClass
public static void setupBeforeClass() throws Exception {
Configuration conf = util.getConfiguration();
- conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
RowProcessorEndpoint.class.getName());
conf.setInt("hbase.client.retries.number", 1);
conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
@@ -138,12 +148,18 @@ public class TestRowProcessorEndpoint {
@Test
public void testDoubleScan() throws Throwable {
prepareTestData();
- RowProcessorProtocol protocol =
- table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
- Set<String> result = protocol.process(processor);
-
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ RowProcessorResult protoResult = service.process(null, request);
+ FriendsOfFriendsProcessorResponse response =
+ FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
+ Set<String> result = new HashSet<String>();
+ result.addAll(response.getResultList());
Set<String> expected =
new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
Get get = new Get(ROW);
@@ -176,12 +192,17 @@ public class TestRowProcessorEndpoint {
}
private int incrementCounter(HTable table) throws Throwable {
- RowProcessorProtocol protocol =
- table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.IncrementCounterProcessor processor =
new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
- int counterValue = protocol.process(processor);
- return counterValue;
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ RowProcessorResult protoResult = service.process(null, request);
+ IncCounterProcessorResponse response = IncCounterProcessorResponse
+ .parseFrom(protoResult.getRowProcessorResult());
+ Integer result = response.getResponse();
+ return result;
}
private void concurrentExec(
@@ -234,23 +255,27 @@ public class TestRowProcessorEndpoint {
}
private void swapRows(HTable table) throws Throwable {
- RowProcessorProtocol protocol =
- table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.RowSwapProcessor processor =
new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
- protocol.process(processor);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ service.process(null, request);
}
@Test
public void testTimeout() throws Throwable {
prepareTestData();
- RowProcessorProtocol protocol =
- table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
RowProcessorEndpoint.TimeoutProcessor processor =
new RowProcessorEndpoint.TimeoutProcessor(ROW);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
boolean exceptionCaught = false;
try {
- protocol.process(processor);
+ service.process(null, request);
} catch (Exception e) {
exceptionCaught = true;
}
@@ -264,11 +289,11 @@ public class TestRowProcessorEndpoint {
* We define the RowProcessors as the inner class of the endpoint.
* So they can be loaded with the endpoint on the coprocessor.
*/
- public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint
- implements RowProcessorProtocol {
-
+ public static class RowProcessorEndpoint<S extends Message,T extends Message>
+ extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
public static class IncrementCounterProcessor extends
- BaseRowProcessor<Integer> implements Writable {
+ BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
+ IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
int counter = 0;
byte[] row = new byte[0];
@@ -288,8 +313,10 @@ public class TestRowProcessorEndpoint {
}
@Override
- public Integer getResult() {
- return counter;
+ public IncCounterProcessorResponse getResult() {
+ IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
+ i.setResponse(counter);
+ return i.build();
}
@Override
@@ -330,21 +357,22 @@ public class TestRowProcessorEndpoint {
}
@Override
- public void readFields(DataInput in) throws IOException {
- this.row = Bytes.readByteArray(in);
- this.counter = in.readInt();
+ public IncCounterProcessorRequest getRequestData() throws IOException {
+ IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
+ builder.setCounter(counter);
+ builder.setRow(ByteString.copyFrom(row));
+ return builder.build();
}
@Override
- public void write(DataOutput out) throws IOException {
- Bytes.writeByteArray(out, row);
- out.writeInt(counter);
+ public void initialize(IncCounterProcessorRequest msg) {
+ this.row = msg.getRow().toByteArray();
+ this.counter = msg.getCounter();
}
-
}
public static class FriendsOfFriendsProcessor extends
- BaseRowProcessor<Set<String>> implements Writable {
+ BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
byte[] row = null;
byte[] person = null;
final Set<String> result = new HashSet<String>();
@@ -366,8 +394,11 @@ public class TestRowProcessorEndpoint {
}
@Override
- public Set<String> getResult() {
- return result;
+ public FriendsOfFriendsProcessorResponse getResult() {
+ FriendsOfFriendsProcessorResponse.Builder builder =
+ FriendsOfFriendsProcessorResponse.newBuilder();
+ builder.addAllResult(result);
+ return builder.build();
}
@Override
@@ -405,29 +436,28 @@ public class TestRowProcessorEndpoint {
}
@Override
- public void readFields(DataInput in) throws IOException {
- this.person = Bytes.readByteArray(in);
- this.row = Bytes.readByteArray(in);
- int size = in.readInt();
- result.clear();
- for (int i = 0; i < size; ++i) {
- result.add(Text.readString(in));
- }
+ public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
+ FriendsOfFriendsProcessorRequest.Builder builder =
+ FriendsOfFriendsProcessorRequest.newBuilder();
+ builder.setPerson(ByteString.copyFrom(person));
+ builder.setRow(ByteString.copyFrom(row));
+ builder.addAllResult(result);
+ FriendsOfFriendsProcessorRequest f = builder.build();
+ return f;
}
@Override
- public void write(DataOutput out) throws IOException {
- Bytes.writeByteArray(out, person);
- Bytes.writeByteArray(out, row);
- out.writeInt(result.size());
- for (String s : result) {
- Text.writeString(out, s);
- }
+ public void initialize(FriendsOfFriendsProcessorRequest request)
+ throws IOException {
+ this.person = request.getPerson().toByteArray();
+ this.row = request.getRow().toByteArray();
+ result.clear();
+ result.addAll(request.getResultList());
}
}
public static class RowSwapProcessor extends
- BaseRowProcessor<Set<String>> implements Writable {
+ BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
byte[] row1 = new byte[0];
byte[] row2 = new byte[0];
@@ -456,6 +486,11 @@ public class TestRowProcessorEndpoint {
}
@Override
+ public RowSwapProcessorResponse getResult() {
+ return RowSwapProcessorResponse.getDefaultInstance();
+ }
+
+ @Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
@@ -502,25 +537,27 @@ public class TestRowProcessorEndpoint {
}
@Override
- public void readFields(DataInput in) throws IOException {
- this.row1 = Bytes.readByteArray(in);
- this.row2 = Bytes.readByteArray(in);
+ public String getName() {
+ return "swap";
}
@Override
- public void write(DataOutput out) throws IOException {
- Bytes.writeByteArray(out, row1);
- Bytes.writeByteArray(out, row2);
+ public RowSwapProcessorRequest getRequestData() throws IOException {
+ RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
+ builder.setRow1(ByteString.copyFrom(row1));
+ builder.setRow2(ByteString.copyFrom(row2));
+ return builder.build();
}
@Override
- public String getName() {
- return "swap";
+ public void initialize(RowSwapProcessorRequest msg) {
+ this.row1 = msg.getRow1().toByteArray();
+ this.row2 = msg.getRow2().toByteArray();
}
}
public static class TimeoutProcessor extends
- BaseRowProcessor<Void> implements Writable {
+ BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
byte[] row = new byte[0];
@@ -539,6 +576,11 @@ public class TestRowProcessorEndpoint {
}
@Override
+ public TimeoutProcessorResponse getResult() {
+ return TimeoutProcessorResponse.getDefaultInstance();
+ }
+
+ @Override
public void process(long now, HRegion region,
List<KeyValue> mutations, WALEdit walEdit) throws IOException {
try {
@@ -555,18 +597,20 @@ public class TestRowProcessorEndpoint {
}
@Override
- public void readFields(DataInput in) throws IOException {
- this.row = Bytes.readByteArray(in);
+ public String getName() {
+ return "timeout";
}
@Override
- public void write(DataOutput out) throws IOException {
- Bytes.writeByteArray(out, row);
+ public TimeoutProcessorRequest getRequestData() throws IOException {
+ TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
+ builder.setRow(ByteString.copyFrom(row));
+ return builder.build();
}
@Override
- public String getName() {
- return "timeout";
+ public void initialize(TimeoutProcessorRequest msg) throws IOException {
+ this.row = msg.getRow().toByteArray();
}
}
Added: hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto (added)
+++ hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto Wed Nov 21 23:11:54 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "IncrementCounterProcessorTestProtos";
+option java_generate_equals_and_hash = true;
+
+message IncCounterProcessorRequest {
+ required bytes row = 1;
+ required int32 counter = 2;
+}
+
+message IncCounterProcessorResponse {
+ required int32 response = 1;
+}
+
+message FriendsOfFriendsProcessorRequest {
+ required bytes person = 1;
+ required bytes row = 2;
+ repeated string result = 3;
+}
+
+message FriendsOfFriendsProcessorResponse {
+ repeated string result = 1;
+}
+
+message RowSwapProcessorRequest {
+ required bytes row1 = 1;
+ required bytes row2 = 2;
+}
+
+message RowSwapProcessorResponse {
+}
+
+message TimeoutProcessorRequest {
+ required bytes row = 1;
+}
+
+message TimeoutProcessorResponse {
+}
\ No newline at end of file