You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/11/22 00:11:56 UTC

svn commit: r1412356 [2/2] - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ hbase-server/src/main/java/o...

Added: hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto (added)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/RowProcessor.proto Wed Nov 21 23:11:54 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines a protocol to perform multi row transactions.
+ * See BaseRowProcessorEndpoint for the implementation.
+ * See HRegion#processRowsWithLocks() for details.
+ */
+option java_package = "org.apache.hadoop.hbase.protobuf.generated";
+option java_outer_classname = "RowProcessorProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message RowProcessorRequest {
+  required string rowProcessorClassName = 1;
+  optional string rowProcessorInitializerMessageName = 2;
+  optional bytes  rowProcessorInitializerMessage = 3;
+}
+
+message RowProcessorResult {
+  required bytes rowProcessorResult = 1;
+}
+
+service RowProcessorService {
+  rpc process (RowProcessorRequest) returns (RowProcessorResult);
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Wed Nov 21 23:11:54 2012
@@ -678,9 +678,10 @@ public class AggregationClient {
     final AggregateArgument.Builder requestBuilder = 
         AggregateArgument.newBuilder();
     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
-    if (ci.columnInterpreterSpecificData() != null) {
-      requestBuilder.setInterpreterSpecificBytes(
-        ci.columnInterpreterSpecificData());
+    ByteString columnInterpreterSpecificData = null;
+    if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData()) 
+       != null) {
+      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData);
     }
     requestBuilder.setScan(ProtobufUtil.toScan(scan));
     return requestBuilder.build();

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/RowProcessorClient.java Wed Nov 21 23:11:54 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.regionserver.RowProcessor;
+
+import com.google.protobuf.Message;
+/**
+ * Convenience class that is used to make RowProcessorEndpoint invocations.
+ * For example usage, refer TestRowProcessorEndpoint
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RowProcessorClient {
+  public static <S extends Message, T extends Message>
+  RowProcessorRequest getRowProcessorPB(RowProcessor<S,T> r)
+      throws IOException {
+    final RowProcessorRequest.Builder requestBuilder =
+        RowProcessorRequest.newBuilder();
+    requestBuilder.setRowProcessorClassName(r.getClass().getName());
+    S s = r.getRequestData();
+    if (s != null) {
+      requestBuilder.setRowProcessorInitializerMessageName(s.getClass().getName());
+      requestBuilder.setRowProcessorInitializerMessage(s.toByteString());
+    }
+    return requestBuilder.build();
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java Wed Nov 21 23:11:54 2012
@@ -18,21 +18,35 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RowProcessor;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
 /**
  * This class demonstrates how to implement atomic read-modify-writes
  * using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor
-    implements RowProcessorProtocol {
-
+public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message> 
+extends RowProcessorService implements CoprocessorService, Coprocessor {
+  private RegionCoprocessorEnvironment env;
   /**
    * Pass a processor to HRegion to process multiple rows atomically.
    * 
@@ -42,16 +56,93 @@ public abstract class BaseRowProcessorEn
    *
    * See {@link TestRowProcessorEndpoint} for example.
    *
-   * @param processor The object defines the read-modify-write procedure
-   * @return The processing result
+   * The request contains information for constructing processor 
+   * (see {@link #constructRowProcessorFromRequest}. The processor object defines
+   * the read-modify-write procedure.
    */
   @Override
-  public <T> T process(RowProcessor<T> processor)
-      throws IOException {
-    HRegion region =
-        ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
-    region.processRowsWithLocks(processor);
-    return processor.getResult();
+  public void process(RpcController controller, RowProcessorRequest request,
+      RpcCallback<RowProcessorResult> done) {
+    RowProcessorResult resultProto = null;
+    try {
+      RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
+      HRegion region = env.getRegion();
+      region.processRowsWithLocks(processor);
+      T result = processor.getResult();
+      RowProcessorResult.Builder b = RowProcessorResult.newBuilder();
+      b.setRowProcessorResult(result.toByteString()); 
+      resultProto = b.build();
+    } catch (Exception e) {
+      ResponseConverter.setControllerException(controller, new IOException(e));
+    }
+    done.run(resultProto);
+  }
+
+  @Override
+  public Service getService() {
+    return this;
+  }
+
+  /**
+   * Stores a reference to the coprocessor environment provided by the
+   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
+   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
+   * on a table region, so always expects this to be an instance of
+   * {@link RegionCoprocessorEnvironment}.
+   * @param env the environment provided by the coprocessor host
+   * @throws IOException if the provided environment is not an instance of
+   * {@code RegionCoprocessorEnvironment}
+   */
+  @Override
+  public void start(CoprocessorEnvironment env) throws IOException {
+    if (env instanceof RegionCoprocessorEnvironment) {
+      this.env = (RegionCoprocessorEnvironment)env;
+    } else {
+      throw new CoprocessorException("Must be loaded on a table region!");
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) throws IOException {
+    // nothing to do
   }
 
+  @SuppressWarnings("unchecked")
+  RowProcessor<S,T> constructRowProcessorFromRequest(RowProcessorRequest request)
+      throws IOException {
+    String className = request.getRowProcessorClassName();
+    Class<?> cls;
+    try {
+      cls = Class.forName(className);
+      RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
+      if (request.hasRowProcessorInitializerMessageName()) {
+        Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
+            .asSubclass(Message.class);
+        Method m;
+        try {
+          m = imn.getMethod("parseFrom", ByteString.class);
+        } catch (SecurityException e) {
+          throw new IOException(e);
+        } catch (NoSuchMethodException e) {
+          throw new IOException(e);
+        }
+        S s;
+        try {
+          s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
+        } catch (IllegalArgumentException e) {
+          throw new IOException(e);
+        } catch (InvocationTargetException e) {
+          throw new IOException(e);
+        }
+        ci.initialize(s);
+      }
+      return ci;
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Wed Nov 21 23:11:54 2012
@@ -119,7 +119,7 @@ public interface ColumnInterpreter<T, S>
   /**
    * This method should return any additional data that is needed on the
    * server side to construct the ColumnInterpreter. The server
-   * will pass this to the {@link #initialize(org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.ColumnInterpreter)}
+   * will pass this to the {@link #initialize(ByteString)}
    * method. If there is no ColumnInterpreter specific data (for e.g.,
    * {@link LongColumnInterpreter}) then null should be returned.
    * @return the PB message
@@ -161,4 +161,4 @@ public interface ColumnInterpreter<T, S>
    * @return cast
    */
   T castToCellType(S response);
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java Wed Nov 21 23:11:54 2012
@@ -23,15 +23,13 @@ import java.util.UUID;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
+import com.google.protobuf.Message;
+
 /**
  * Base class for RowProcessor with some default implementations.
  */
-public abstract class BaseRowProcessor<T> implements RowProcessor<T> {
-
-  @Override
-  public T getResult() {
-    return null;
-  }
+public abstract class BaseRowProcessor<S extends Message,T extends Message> 
+implements RowProcessor<S,T> {
 
   @Override
   public void preProcess(HRegion region, WALEdit walEdit) throws IOException {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Nov 21 23:11:54 2012
@@ -4302,7 +4302,7 @@ public class HRegion implements HeapSize
    *
    * @param processor The object defines the reads and writes to a row.
    */
-  public void processRowsWithLocks(RowProcessor<?> processor)
+  public void processRowsWithLocks(RowProcessor<?,?> processor)
       throws IOException {
     processRowsWithLocks(processor, rowProcessorTimeout);
   }
@@ -4314,7 +4314,7 @@ public class HRegion implements HeapSize
    * @param timeout The timeout of the processor.process() execution
    *                Use a negative number to switch off the time bound
    */
-  public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
+  public void processRowsWithLocks(RowProcessor<?,?> processor, long timeout)
       throws IOException {
 
     for (byte[] row : processor.getRowsToLock()) {
@@ -4453,7 +4453,7 @@ public class HRegion implements HeapSize
     }
   }
 
-  private void doProcessRowWithTimeout(final RowProcessor<?> processor,
+  private void doProcessRowWithTimeout(final RowProcessor<?,?> processor,
                                        final long now,
                                        final HRegion region,
                                        final List<KeyValue> mutations,

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java Wed Nov 21 23:11:54 2012
@@ -27,13 +27,16 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProcessorProtos.MultiRowMutationProcessorResponse;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A <code>MultiRowProcessor</code> that performs multiple puts and deletes.
  */
-class MultiRowMutationProcessor extends BaseRowProcessor<Void> {
+class MultiRowMutationProcessor extends BaseRowProcessor<MultiRowMutationProcessorRequest,
+MultiRowMutationProcessorResponse> {
   Collection<byte[]> rowsToLock;
   Collection<Mutation> mutations;
 
@@ -52,6 +55,11 @@ class MultiRowMutationProcessor extends 
   public boolean readOnly() {
     return false;
   }
+  
+  @Override
+  public MultiRowMutationProcessorResponse getResult() {
+    return MultiRowMutationProcessorResponse.getDefaultInstance();
+  }
 
   @Override
   public void process(long now,
@@ -123,4 +131,13 @@ class MultiRowMutationProcessor extends 
     }
   }
 
+  @Override
+  public MultiRowMutationProcessorRequest getRequestData() {
+    return MultiRowMutationProcessorRequest.getDefaultInstance();
+  }
+
+  @Override
+  public void initialize(MultiRowMutationProcessorRequest msg) {
+    //nothing
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java Wed Nov 21 23:11:54 2012
@@ -27,6 +27,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
+
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 
@@ -38,10 +41,12 @@ import org.apache.hadoop.hbase.regionser
  * This class performs scans and generates mutations and WAL edits.
  * The locks and MVCC will be handled by HRegion.
  *
- * The generic type parameter T is the return type of
- * RowProcessor.getResult().
+ * The RowProcessor user code could have data that needs to be 
+ * sent across for proper initialization at the server side. The generic type 
+ * parameter S is the type of the request data sent to the server.
+ * The generic type parameter T is the return type of RowProcessor.getResult().
  */
-public interface RowProcessor<T> {
+public interface RowProcessor<S extends Message, T extends Message> {
 
   /**
    * Rows to lock while operation.
@@ -51,7 +56,9 @@ public interface RowProcessor<T> {
   Collection<byte[]> getRowsToLock();
 
   /**
-   * Obtain the processing result
+   * Obtain the processing result. All row processor implementations must
+   * implement this, even if the method is simply returning an empty
+   * Message.
    */
   T getResult();
 
@@ -108,4 +115,22 @@ public interface RowProcessor<T> {
    * @return The name of the processor
    */
   String getName();
-}
+
+  /**
+   * This method should return any additional data that is needed on the
+   * server side to construct the RowProcessor. The server will pass this to
+   * the {@link #initialize(ByteString)} method. If there is no RowProcessor
+   * specific data then null should be returned.
+   * @return the PB message
+   * @throws IOException
+   */
+  S getRequestData() throws IOException;
+
+  /**
+   * This method should initialize any field(s) of the RowProcessor with
+   * a parsing of the passed message bytes (used on the server side).
+   * @param msg
+   * @throws IOException
+   */
+  void initialize(S msg) throws IOException;
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java?rev=1412356&r1=1412355&r2=1412356&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java Wed Nov 21 23:11:54 2012
@@ -21,8 +21,6 @@ package org.apache.hadoop.hbase.coproces
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,27 +36,39 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorResult;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
 import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.sun.org.apache.commons.logging.Log;
 import com.sun.org.apache.commons.logging.LogFactory;
 
@@ -100,7 +110,7 @@ public class TestRowProcessorEndpoint {
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
     Configuration conf = util.getConfiguration();
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
         RowProcessorEndpoint.class.getName());
     conf.setInt("hbase.client.retries.number", 1);
     conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
@@ -138,12 +148,18 @@ public class TestRowProcessorEndpoint {
   @Test
   public void testDoubleScan() throws Throwable {
     prepareTestData();
-    RowProcessorProtocol protocol =
-        table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+    
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
     RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
         new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
-    Set<String> result = protocol.process(processor);
-
+    RowProcessorService.BlockingInterface service = 
+        RowProcessorService.newBlockingStub(channel);
+    RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    RowProcessorResult protoResult = service.process(null, request);
+    FriendsOfFriendsProcessorResponse response = 
+        FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
+    Set<String> result = new HashSet<String>();
+    result.addAll(response.getResultList()); 
     Set<String> expected =
       new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
     Get get = new Get(ROW);
@@ -176,12 +192,17 @@ public class TestRowProcessorEndpoint {
   }
 
   private int incrementCounter(HTable table) throws Throwable {
-    RowProcessorProtocol protocol =
-        table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
     RowProcessorEndpoint.IncrementCounterProcessor processor =
         new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
-    int counterValue = protocol.process(processor);
-    return counterValue;
+    RowProcessorService.BlockingInterface service = 
+        RowProcessorService.newBlockingStub(channel);
+    RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    RowProcessorResult protoResult = service.process(null, request);
+    IncCounterProcessorResponse response = IncCounterProcessorResponse
+        .parseFrom(protoResult.getRowProcessorResult());
+    Integer result = response.getResponse();
+    return result;
   }
 
   private void concurrentExec(
@@ -234,23 +255,27 @@ public class TestRowProcessorEndpoint {
   }
 
   private void swapRows(HTable table) throws Throwable {
-    RowProcessorProtocol protocol =
-        table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
     RowProcessorEndpoint.RowSwapProcessor processor =
         new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
-    protocol.process(processor);
+    RowProcessorService.BlockingInterface service = 
+        RowProcessorService.newBlockingStub(channel);
+    RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    service.process(null, request);
   }
 
   @Test
   public void testTimeout() throws Throwable {
     prepareTestData();
-    RowProcessorProtocol protocol =
-        table.coprocessorProxy(RowProcessorProtocol.class, ROW);
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
     RowProcessorEndpoint.TimeoutProcessor processor =
         new RowProcessorEndpoint.TimeoutProcessor(ROW);
+    RowProcessorService.BlockingInterface service = 
+        RowProcessorService.newBlockingStub(channel);
+    RowProcessorRequest request = RowProcessorClient.getRowProcessorPB(processor);
     boolean exceptionCaught = false;
     try {
-      protocol.process(processor);
+      service.process(null, request);
     } catch (Exception e) {
       exceptionCaught = true;
     }
@@ -264,11 +289,11 @@ public class TestRowProcessorEndpoint {
    * We define the RowProcessors as the inner class of the endpoint.
    * So they can be loaded with the endpoint on the coprocessor.
    */
-  public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint
-      implements RowProcessorProtocol {
-
+  public static class RowProcessorEndpoint<S extends Message,T extends Message>
+  extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
     public static class IncrementCounterProcessor extends
-        BaseRowProcessor<Integer> implements Writable {
+        BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
+        IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
       int counter = 0;
       byte[] row = new byte[0];
 
@@ -288,8 +313,10 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public Integer getResult() {
-        return counter;
+      public IncCounterProcessorResponse getResult() {
+        IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
+        i.setResponse(counter);
+        return i.build();
       }
 
       @Override
@@ -330,21 +357,22 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public void readFields(DataInput in) throws IOException {
-        this.row = Bytes.readByteArray(in);
-        this.counter = in.readInt();
+      public IncCounterProcessorRequest getRequestData() throws IOException {
+        IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
+        builder.setCounter(counter);
+        builder.setRow(ByteString.copyFrom(row));
+        return builder.build();
       }
 
       @Override
-      public void write(DataOutput out) throws IOException {
-        Bytes.writeByteArray(out, row);
-        out.writeInt(counter);
+      public void initialize(IncCounterProcessorRequest msg) {
+        this.row = msg.getRow().toByteArray();
+        this.counter = msg.getCounter();
       }
-
     }
 
     public static class FriendsOfFriendsProcessor extends
-        BaseRowProcessor<Set<String>> implements Writable {
+        BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
       byte[] row = null;
       byte[] person = null;
       final Set<String> result = new HashSet<String>();
@@ -366,8 +394,11 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public Set<String> getResult() {
-        return result;
+      public FriendsOfFriendsProcessorResponse getResult() {
+        FriendsOfFriendsProcessorResponse.Builder builder = 
+            FriendsOfFriendsProcessorResponse.newBuilder();
+        builder.addAllResult(result);
+        return builder.build();
       }
 
       @Override
@@ -405,29 +436,28 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public void readFields(DataInput in) throws IOException {
-        this.person = Bytes.readByteArray(in);
-        this.row = Bytes.readByteArray(in);
-        int size = in.readInt();
-        result.clear();
-        for (int i = 0; i < size; ++i) {
-          result.add(Text.readString(in));
-        }
+      public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
+        FriendsOfFriendsProcessorRequest.Builder builder =
+            FriendsOfFriendsProcessorRequest.newBuilder();
+        builder.setPerson(ByteString.copyFrom(person));
+        builder.setRow(ByteString.copyFrom(row));
+        builder.addAllResult(result);
+        FriendsOfFriendsProcessorRequest f = builder.build();
+        return f;
       }
 
       @Override
-      public void write(DataOutput out) throws IOException {
-        Bytes.writeByteArray(out, person);
-        Bytes.writeByteArray(out, row);
-        out.writeInt(result.size());
-        for (String s : result) {
-          Text.writeString(out, s);
-        }
+      public void initialize(FriendsOfFriendsProcessorRequest request) 
+          throws IOException {
+        this.person = request.getPerson().toByteArray();
+        this.row = request.getRow().toByteArray();
+        result.clear();
+        result.addAll(request.getResultList());
       }
     }
 
     public static class RowSwapProcessor extends
-        BaseRowProcessor<Set<String>> implements Writable {
+        BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
       byte[] row1 = new byte[0];
       byte[] row2 = new byte[0];
 
@@ -456,6 +486,11 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
+      public RowSwapProcessorResponse getResult() {
+        return RowSwapProcessorResponse.getDefaultInstance();
+      }
+
+      @Override
       public void process(long now, HRegion region,
           List<KeyValue> mutations, WALEdit walEdit) throws IOException {
 
@@ -502,25 +537,27 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public void readFields(DataInput in) throws IOException {
-        this.row1 = Bytes.readByteArray(in);
-        this.row2 = Bytes.readByteArray(in);
+      public String getName() {
+        return "swap";
       }
 
       @Override
-      public void write(DataOutput out) throws IOException {
-        Bytes.writeByteArray(out, row1);
-        Bytes.writeByteArray(out, row2);
+      public RowSwapProcessorRequest getRequestData() throws IOException {
+        RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
+        builder.setRow1(ByteString.copyFrom(row1));
+        builder.setRow2(ByteString.copyFrom(row2));
+        return builder.build();
       }
 
       @Override
-      public String getName() {
-        return "swap";
+      public void initialize(RowSwapProcessorRequest msg) {
+        this.row1 = msg.getRow1().toByteArray();
+        this.row2 = msg.getRow2().toByteArray();
       }
     }
 
     public static class TimeoutProcessor extends
-        BaseRowProcessor<Void> implements Writable {
+        BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
 
       byte[] row = new byte[0];
 
@@ -539,6 +576,11 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
+      public TimeoutProcessorResponse getResult() {
+        return TimeoutProcessorResponse.getDefaultInstance();
+      }
+
+      @Override
       public void process(long now, HRegion region,
           List<KeyValue> mutations, WALEdit walEdit) throws IOException {
         try {
@@ -555,18 +597,20 @@ public class TestRowProcessorEndpoint {
       }
 
       @Override
-      public void readFields(DataInput in) throws IOException {
-        this.row = Bytes.readByteArray(in);
+      public String getName() {
+        return "timeout";
       }
 
       @Override
-      public void write(DataOutput out) throws IOException {
-        Bytes.writeByteArray(out, row);
+      public TimeoutProcessorRequest getRequestData() throws IOException {
+        TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
+        builder.setRow(ByteString.copyFrom(row));
+        return builder.build();
       }
 
       @Override
-      public String getName() {
-        return "timeout";
+      public void initialize(TimeoutProcessorRequest msg) throws IOException {
+        this.row = msg.getRow().toByteArray();
       }
     }
 

Added: hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto?rev=1412356&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto (added)
+++ hbase/trunk/hbase-server/src/test/protobuf/IncrementCounterProcessor.proto Wed Nov 21 23:11:54 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.protobuf.generated";
+option java_outer_classname = "IncrementCounterProcessorTestProtos";
+option java_generate_equals_and_hash = true;
+
+message IncCounterProcessorRequest {
+  required bytes row = 1;
+  required int32 counter = 2;
+}
+
+message IncCounterProcessorResponse {
+  required int32 response = 1;
+}
+
+message FriendsOfFriendsProcessorRequest {
+  required bytes person = 1;
+  required bytes row = 2;
+  repeated string result = 3;
+}
+
+message FriendsOfFriendsProcessorResponse {
+  repeated string result = 1;
+}
+
+message RowSwapProcessorRequest {
+  required bytes row1 = 1;
+  required bytes row2 = 2;
+}
+
+message RowSwapProcessorResponse {
+}
+
+message TimeoutProcessorRequest {
+  required bytes row = 1;
+}
+
+message TimeoutProcessorResponse {
+}
\ No newline at end of file