You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2013/01/05 14:22:05 UTC

svn commit: r1429278 - in /hbase/trunk: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ hbase-protocol/src/main/protobuf/ hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ hbase-server/src/main/java/org/apa...

Author: tedyu
Date: Sat Jan  5 13:22:04 2013
New Revision: 1429278

URL: http://svn.apache.org/viewvc?rev=1429278&view=rev
Log:
HBASE-7405 Enforce PB ser/de for Aggregate protocol and associated ColumnInterpreter user code bits (Devaraj Das)


Modified:
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java Sat Jan  5 13:22:04 2013
@@ -11303,6 +11303,685 @@ public final class HBaseProtos {
     // @@protoc_insertion_point(class_scope:NameInt64Pair)
   }
   
+  public interface EmptyMsgOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+  }
+  public static final class EmptyMsg extends
+      com.google.protobuf.GeneratedMessage
+      implements EmptyMsgOrBuilder {
+    // Use EmptyMsg.newBuilder() to construct.
+    private EmptyMsg(Builder builder) {
+      super(builder);
+    }
+    private EmptyMsg(boolean noInit) {}
+    
+    private static final EmptyMsg defaultInstance;
+    public static EmptyMsg getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public EmptyMsg getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_EmptyMsg_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_EmptyMsg_fieldAccessorTable;
+    }
+    
+    private void initFields() {
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg other = (org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg) obj;
+      
+      boolean result = true;
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+    
+    @java.lang.Override
+    public int hashCode() {
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      return hash;
+    }
+    
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsgOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_EmptyMsg_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_EmptyMsg_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.getDescriptor();
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.getDefaultInstance();
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg build() {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg result = new org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg(this);
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.getDefaultInstance()) return this;
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+          }
+        }
+      }
+      
+      
+      // @@protoc_insertion_point(builder_scope:EmptyMsg)
+    }
+    
+    static {
+      defaultInstance = new EmptyMsg(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:EmptyMsg)
+  }
+  
+  public interface LongMsgOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // required int64 longMsg = 1;
+    boolean hasLongMsg();
+    long getLongMsg();
+  }
+  public static final class LongMsg extends
+      com.google.protobuf.GeneratedMessage
+      implements LongMsgOrBuilder {
+    // Use LongMsg.newBuilder() to construct.
+    private LongMsg(Builder builder) {
+      super(builder);
+    }
+    private LongMsg(boolean noInit) {}
+    
+    private static final LongMsg defaultInstance;
+    public static LongMsg getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public LongMsg getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_LongMsg_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_LongMsg_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required int64 longMsg = 1;
+    public static final int LONGMSG_FIELD_NUMBER = 1;
+    private long longMsg_;
+    public boolean hasLongMsg() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getLongMsg() {
+      return longMsg_;
+    }
+    
+    private void initFields() {
+      longMsg_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasLongMsg()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, longMsg_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, longMsg_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg)) {
+        return super.equals(obj);
+      }
+      org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg other = (org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg) obj;
+      
+      boolean result = true;
+      result = result && (hasLongMsg() == other.hasLongMsg());
+      if (hasLongMsg()) {
+        result = result && (getLongMsg()
+            == other.getLongMsg());
+      }
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+    
+    @java.lang.Override
+    public int hashCode() {
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasLongMsg()) {
+        hash = (37 * hash) + LONGMSG_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getLongMsg());
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      return hash;
+    }
+    
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsgOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_LongMsg_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.internal_static_LongMsg_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        longMsg_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.getDescriptor();
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg getDefaultInstanceForType() {
+        return org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.getDefaultInstance();
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg build() {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg buildPartial() {
+        org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg result = new org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.longMsg_ = longMsg_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg) {
+          return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg other) {
+        if (other == org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.getDefaultInstance()) return this;
+        if (other.hasLongMsg()) {
+          setLongMsg(other.getLongMsg());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasLongMsg()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              longMsg_ = input.readInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required int64 longMsg = 1;
+      private long longMsg_ ;
+      public boolean hasLongMsg() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getLongMsg() {
+        return longMsg_;
+      }
+      public Builder setLongMsg(long value) {
+        bitField0_ |= 0x00000001;
+        longMsg_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearLongMsg() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        longMsg_ = 0L;
+        onChanged();
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:LongMsg)
+    }
+    
+    static {
+      defaultInstance = new LongMsg(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:LongMsg)
+  }
+  
   private static com.google.protobuf.Descriptors.Descriptor
     internal_static_TableSchema_descriptor;
   private static
@@ -11388,6 +12067,16 @@ public final class HBaseProtos {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_NameInt64Pair_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_EmptyMsg_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_EmptyMsg_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_LongMsg_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_LongMsg_fieldAccessorTable;
   
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -11440,13 +12129,14 @@ public final class HBaseProtos {
       "value\030\002 \002(\t\",\n\rNameBytesPair\022\014\n\004name\030\001 \002" +
       "(\t\022\r\n\005value\030\002 \001(\014\"/\n\016BytesBytesPair\022\r\n\005f" +
       "irst\030\001 \002(\014\022\016\n\006second\030\002 \002(\014\",\n\rNameInt64P" +
-      "air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003*r\n\013Comp" +
+      "air\022\014\n\004name\030\001 \001(\t\022\r\n\005value\030\002 \001(\003\"\n\n\010Empt" +
+      "yMsg\"\032\n\007LongMsg\022\017\n\007longMsg\030\001 \002(\003*r\n\013Comp" +
       "areType\022\010\n\004LESS\020\000\022\021\n\rLESS_OR_EQUAL\020\001\022\t\n\005" +
       "EQUAL\020\002\022\r\n\tNOT_EQUAL\020\003\022\024\n\020GREATER_OR_EQU" +
       "AL\020\004\022\013\n\007GREATER\020\005\022\t\n\005NO_OP\020\006*_\n\007KeyType\022" +
       "\013\n\007MINIMUM\020\000\022\007\n\003PUT\020\004\022\n\n\006DELETE\020\010\022\021\n\rDEL" +
-      "ETE_COLUMN\020\014\022\021\n\rDELETE_FAMILY\020\016\022\014\n\007MAXIM" +
-      "UM\020\377\001B>\n*org.apache.hadoop.hbase.protobu",
+      "ETE_COLUMN\020\014\022\021\n\rDELETE_FAMILY\020\016\022\014\n\007MAXIM",
+      "UM\020\377\001B>\n*org.apache.hadoop.hbase.protobu" +
       "f.generatedB\013HBaseProtosH\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
@@ -11590,6 +12280,22 @@ public final class HBaseProtos {
               new java.lang.String[] { "Name", "Value", },
               org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair.class,
               org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameInt64Pair.Builder.class);
+          internal_static_EmptyMsg_descriptor =
+            getDescriptor().getMessageTypes().get(15);
+          internal_static_EmptyMsg_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_EmptyMsg_descriptor,
+              new java.lang.String[] { },
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.class,
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg.Builder.class);
+          internal_static_LongMsg_descriptor =
+            getDescriptor().getMessageTypes().get(16);
+          internal_static_LongMsg_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_LongMsg_descriptor,
+              new java.lang.String[] { "LongMsg", },
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.class,
+              org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg.Builder.class);
           return null;
         }
       };

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/hbase.proto Sat Jan  5 13:22:04 2013
@@ -268,3 +268,10 @@ message NameInt64Pair {
   optional string name = 1;
   optional int64 value = 2;
 }
+
+message EmptyMsg {
+}
+
+message LongMsg {
+  required int64 longMsg = 1;
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java Sat Jan  5 13:22:04 2013
@@ -20,6 +20,10 @@
 package org.apache.hadoop.hbase.client.coprocessor;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -51,6 +55,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 
 /**
  * This client class is for invoking the aggregate functions deployed on the
@@ -98,7 +103,8 @@ public class AggregationClient {
    *           The caller is supposed to handle the exception as they are thrown
    *           & propagated to it.
    */
-  public <R, S> R max(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  R max(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MaxCallBack implements Batch.Callback<R> {
@@ -130,9 +136,9 @@ public class AggregationClient {
                 throw controller.getFailedOn();
               }
               if (response.getFirstPartCount() > 0) {
-                return ci.castToCellType(
-                          ci.parseResponseAsPromotedType(
-                              getBytesFromResponse(response.getFirstPart(0))));
+                ByteString b = response.getFirstPart(0);
+                Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
+                return ci.getCellValueFromProto(q);
               }
               return null;
             }
@@ -168,7 +174,8 @@ public class AggregationClient {
    * @return min val <R>
    * @throws Throwable
    */
-  public <R, S> R min(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  R min(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class MinCallBack implements Batch.Callback<R> {
@@ -202,9 +209,9 @@ public class AggregationClient {
                 throw controller.getFailedOn();
               }
               if (response.getFirstPartCount() > 0) {
-                return ci.castToCellType(
-                  ci.parseResponseAsPromotedType(
-                      getBytesFromResponse(response.getFirstPart(0))));
+                ByteString b = response.getFirstPart(0);
+                Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b);
+                return ci.getCellValueFromProto(q);
               }
               return null;
             }
@@ -231,8 +238,9 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> long rowCount(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  long rowCount(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class RowNumCallback implements Batch.Callback<Long> {
       private final AtomicLong rowCountL = new AtomicLong(0);
@@ -285,7 +293,8 @@ public class AggregationClient {
    * @return sum <S>
    * @throws Throwable
    */
-  public <R, S> S sum(final byte[] tableName, final ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message> 
+  S sum(final byte[] tableName, final ColumnInterpreter<R, S, P, Q, T> ci,
       final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     
@@ -320,8 +329,10 @@ public class AggregationClient {
               if (response.getFirstPartCount() == 0) {
                 return null;
               }
-              return ci.parseResponseAsPromotedType(
-                  getBytesFromResponse(response.getFirstPart(0)));
+              ByteString b = response.getFirstPart(0);
+              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+              S s = ci.getPromotedValueFromProto(t);
+              return s;
             }
           }, sumCallBack);
     } finally {
@@ -340,8 +351,9 @@ public class AggregationClient {
    * @param scan
    * @throws Throwable
    */
-  private <R, S> Pair<S, Long> getAvgArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<S, Long> getAvgArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class AvgCallBack implements Batch.Callback<Pair<S, Long>> {
       S sum = null;
@@ -379,8 +391,10 @@ public class AggregationClient {
               if (response.getFirstPartCount() == 0) {
                 return pair;
               }
-              pair.setFirst(ci.parseResponseAsPromotedType(
-                  getBytesFromResponse(response.getFirstPart(0))));
+              ByteString b = response.getFirstPart(0);
+              T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+              S s = ci.getPromotedValueFromProto(t);
+              pair.setFirst(s);
               ByteBuffer bb = ByteBuffer.allocate(8).put(
                   getBytesFromResponse(response.getSecondPart()));
               bb.rewind();
@@ -408,8 +422,9 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> double avg(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, Scan scan) throws Throwable {
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  double avg(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable {
     Pair<S, Long> p = getAvgArgs(tableName, ci, scan);
     return ci.divideForAvg(p.getFirst(), p.getSecond());
   }
@@ -425,8 +440,9 @@ public class AggregationClient {
    * @return
    * @throws Throwable
    */
-  private <R, S> Pair<List<S>, Long> getStdArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<List<S>, Long> getStdArgs(final byte[] tableName,
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     class StdCallback implements Batch.Callback<Pair<List<S>, Long>> {
       long rowCountVal = 0l;
@@ -474,8 +490,10 @@ public class AggregationClient {
               }
               List<S> list = new ArrayList<S>();
               for (int i = 0; i < response.getFirstPartCount(); i++) {
-                list.add(ci.parseResponseAsPromotedType(
-                    getBytesFromResponse(response.getFirstPart(i))));
+                ByteString b = response.getFirstPart(i);
+                T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+                S s = ci.getPromotedValueFromProto(t);
+                list.add(s);
               }
               pair.setFirst(list);
               ByteBuffer bb = ByteBuffer.allocate(8).put(
@@ -505,7 +523,8 @@ public class AggregationClient {
    * @return <R, S>
    * @throws Throwable
    */
-  public <R, S> double std(final byte[] tableName, ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  double std(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
       Scan scan) throws Throwable {
     Pair<List<S>, Long> p = getStdArgs(tableName, ci, scan);
     double res = 0d;
@@ -528,9 +547,10 @@ public class AggregationClient {
    *  (sum of values, sum of weights) for all the regions chosen
    * @throws Throwable
    */
-  private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
+  private <R, S, P extends Message, Q extends Message, T extends Message>
+  Pair<NavigableMap<byte[], List<S>>, List<S>>
   getMedianArgs(final byte[] tableName,
-      final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
+      final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable {
     final AggregateArgument requestArg = validateArgAndGetPB(scan, ci);
     final NavigableMap<byte[], List<S>> map =
       new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
@@ -572,8 +592,10 @@ public class AggregationClient {
 
               List<S> list = new ArrayList<S>();
               for (int i = 0; i < response.getFirstPartCount(); i++) {
-                list.add(ci.parseResponseAsPromotedType(
-                    getBytesFromResponse(response.getFirstPart(i))));
+                ByteString b = response.getFirstPart(i);
+                T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b);
+                S s = ci.getPromotedValueFromProto(t);
+                list.add(s);
               }
               return list;
             }
@@ -597,7 +619,8 @@ public class AggregationClient {
    * @return R the median
    * @throws Throwable
    */
-  public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
+  public <R, S, P extends Message, Q extends Message, T extends Message>
+  R median(final byte[] tableName, ColumnInterpreter<R, S, P, Q, T> ci,
       Scan scan) throws Throwable {
     Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
     byte[] startRow = null;
@@ -672,16 +695,17 @@ public class AggregationClient {
     return null;
   }
 
-  <R,S>AggregateArgument validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S> ci)
+  <R, S, P extends Message, Q extends Message, T extends Message> AggregateArgument 
+  validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci)
       throws IOException {
     validateParameters(scan);
     final AggregateArgument.Builder requestBuilder = 
         AggregateArgument.newBuilder();
     requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName());
-    ByteString columnInterpreterSpecificData = null;
-    if ((columnInterpreterSpecificData = ci.columnInterpreterSpecificData()) 
+    P columnInterpreterSpecificData = null;
+    if ((columnInterpreterSpecificData = ci.getRequestData()) 
        != null) {
-      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData);
+      requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString());
     }
     requestBuilder.setScan(ProtobufUtil.toScan(scan));
     return requestBuilder.build();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/coprocessor/LongColumnInterpreter.java Sat Jan  5 13:22:04 2013
@@ -19,16 +19,15 @@
 package org.apache.hadoop.hbase.client.coprocessor;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import com.google.protobuf.ByteString;
-
 /**
  * a concrete column interpreter implementation. The cell value is a Long value
  * and its promoted data type is also a Long value. For computing aggregation
@@ -39,7 +38,8 @@ import com.google.protobuf.ByteString;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class LongColumnInterpreter implements ColumnInterpreter<Long, Long> {
+public class LongColumnInterpreter extends ColumnInterpreter<Long, Long,
+                 EmptyMsg, LongMsg, LongMsg> {
 
   public Long getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
       throws IOException {
@@ -97,45 +97,40 @@ public class LongColumnInterpreter imple
     return o;
   }
 
-
   @Override
-  public Long parseResponseAsPromotedType(byte[] response) {
-    ByteBuffer b = ByteBuffer.allocate(8).put(response);
-    b.rewind();
-    long l = b.getLong();
+  public Long castToCellType(Long l) {
     return l;
   }
 
   @Override
-  public Long castToCellType(Long l) {
-    return l;
+  public EmptyMsg getRequestData() {
+    return EmptyMsg.getDefaultInstance();
   }
 
   @Override
-  public ByteString columnInterpreterSpecificData() {
-    // nothing
-    return null;
+  public void initialize(EmptyMsg msg) {
+    //nothing 
   }
 
   @Override
-  public void initialize(ByteString bytes) {
-    // nothing
+  public LongMsg getProtoForCellType(Long t) {
+    LongMsg.Builder builder = LongMsg.newBuilder();
+    return builder.setLongMsg(t).build();
   }
 
   @Override
-  public ByteString getProtoForCellType(Long t) {
-    return getProtoForPromotedOrCellType(t);
+  public LongMsg getProtoForPromotedType(Long s) {
+    LongMsg.Builder builder = LongMsg.newBuilder();
+    return builder.setLongMsg(s).build();
   }
 
   @Override
-  public ByteString getProtoForPromotedType(Long s) {
-    return getProtoForPromotedOrCellType(s);
+  public Long getPromotedValueFromProto(LongMsg r) {
+    return r.getLongMsg();
   }
 
-  private ByteString getProtoForPromotedOrCellType(Long s) {
-    ByteBuffer bb = ByteBuffer.allocate(8).putLong(s);
-    bb.rewind();
-    ByteString bs = ByteString.copyFrom(bb);
-    return bs;
+  @Override
+  public Long getCellValueFromProto(LongMsg q) {
+    return q.getLongMsg();
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java Sat Jan  5 13:22:04 2013
@@ -19,6 +19,10 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,6 +46,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -49,13 +54,19 @@ import com.google.protobuf.Service;
 /**
  * A concrete AggregateProtocol implementation. Its system level coprocessor
  * that computes the aggregate function at a region level.
- * @param <T>
- * @param <S>
+ * {@link ColumnInterpreter} is used to interpret column value. This class is
+ * parameterized with the following (these are the types with which the {@link ColumnInterpreter}
+ * is parameterized, and for more description on these, refer to {@link ColumnInterpreter}):
+ * @param <T> Cell value data type
+ * @param <S> Promoted data type
+ * @param <P> PB message that is used to transport initializer specific bytes
+ * @param <Q> PB message that is used to transport Cell (<T>) instance
+ * @param <R> PB message that is used to transport Promoted (<S>) instance
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class AggregateImplementation<T, S> extends AggregateService implements
-    CoprocessorService, Coprocessor {
+public class AggregateImplementation<T, S, P extends Message, Q extends Message, R extends Message> 
+extends AggregateService implements CoprocessorService, Coprocessor {
   protected static Log log = LogFactory.getLog(AggregateImplementation.class);
   private RegionCoprocessorEnvironment env;
 
@@ -73,7 +84,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     T max = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -96,7 +107,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (max != null) {
         AggregateResponse.Builder builder = AggregateResponse.newBuilder();
-        builder.addFirstPart(ci.getProtoForCellType(max));
+        builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
         response = builder.build();
       }
     } catch (IOException e) {
@@ -127,7 +138,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     T min = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -149,7 +160,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (min != null) {
         response = AggregateResponse.newBuilder().addFirstPart( 
-          ci.getProtoForCellType(min)).build();
+          ci.getProtoForCellType(min).toByteString()).build();
       }
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -179,7 +190,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     long sum = 0l;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -203,7 +214,7 @@ public class AggregateImplementation<T, 
       } while (hasMoreRows);
       if (sumVal != null) {
         response = AggregateResponse.newBuilder().addFirstPart( 
-          ci.getProtoForPromotedType(sumVal)).build();
+          ci.getProtoForPromotedType(sumVal).toByteString()).build();
       }
     } catch (IOException e) {
       ResponseConverter.setControllerException(controller, e);
@@ -287,7 +298,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     InternalScanner scanner = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       Long rowCountVal = 0l;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -311,7 +322,7 @@ public class AggregateImplementation<T, 
         rowCountVal++;
       } while (hasMoreRows);
       if (sumVal != null) {
-        ByteString first = ci.getProtoForPromotedType(sumVal);
+        ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         pair.addFirstPart(first);
         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
@@ -346,7 +357,7 @@ public class AggregateImplementation<T, 
     InternalScanner scanner = null;
     AggregateResponse response = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumSqVal = null, tempVal = null;
       long rowCountVal = 0l;
       Scan scan = ProtobufUtil.toScan(request.getScan());
@@ -374,8 +385,8 @@ public class AggregateImplementation<T, 
         rowCountVal++;
       } while (hasMoreRows);
       if (sumVal != null) {
-        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
-        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal);
+        ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
+        ByteString first_sumSqVal = ci.getProtoForPromotedType(sumSqVal).toByteString();
         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         pair.addFirstPart(first_sumVal);
         pair.addFirstPart(first_sumSqVal);
@@ -410,7 +421,7 @@ public class AggregateImplementation<T, 
     AggregateResponse response = null;
     InternalScanner scanner = null;
     try {
-      ColumnInterpreter<T, S> ci = constructColumnInterpreterFromRequest(request);
+      ColumnInterpreter<T, S, P, Q, R> ci = constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
       Scan scan = ProtobufUtil.toScan(request.getScan());
       scanner = env.getRegion().getScanner(scan);
@@ -442,9 +453,9 @@ public class AggregateImplementation<T, 
         sumVal = ci.add(sumVal, tempVal);
         sumWeights = ci.add(sumWeights, tempWeight);
       } while (hasMoreRows);
-      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal);
+      ByteString first_sumVal = ci.getProtoForPromotedType(sumVal).toByteString();
       S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights;
-      ByteString first_sumWeights = ci.getProtoForPromotedType(s);
+      ByteString first_sumWeights = ci.getProtoForPromotedType(s).toByteString();
       AggregateResponse.Builder pair = AggregateResponse.newBuilder();
       pair.addFirstPart(first_sumVal);
       pair.addFirstPart(first_sumWeights); 
@@ -462,15 +473,17 @@ public class AggregateImplementation<T, 
   }
 
   @SuppressWarnings("unchecked")
-  ColumnInterpreter<T,S> constructColumnInterpreterFromRequest(
+  ColumnInterpreter<T,S,P,Q,R> constructColumnInterpreterFromRequest(
       AggregateArgument request) throws IOException {
     String className = request.getInterpreterClassName();
     Class<?> cls;
     try {
       cls = Class.forName(className);
-      ColumnInterpreter<T,S> ci = (ColumnInterpreter<T, S>) cls.newInstance();
+      ColumnInterpreter<T,S,P,Q,R> ci = (ColumnInterpreter<T, S, P, Q, R>) cls.newInstance();
       if (request.hasInterpreterSpecificBytes()) {
-        ci.initialize(request.getInterpreterSpecificBytes());
+        ByteString b = request.getInterpreterSpecificBytes();
+        P initMsg = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 2, b);
+        ci.initialize(initMsg);
       }
       return ci;
     } catch (ClassNotFoundException e) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/ColumnInterpreter.java Sat Jan  5 13:22:04 2013
@@ -24,9 +24,11 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
 import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
 
 import com.google.protobuf.ByteString;
+import com.google.protobuf.Message;
 
 /**
  * Defines how value for specific column is interpreted and provides utility
@@ -35,7 +37,8 @@ import com.google.protobuf.ByteString;
  * handle null case gracefully. Refer to {@link LongColumnInterpreter} for an
  * example.
  * <p>
- * Takes two generic parameters. The cell value type of the interpreter is <T>.
+ * Takes two generic parameters and three Message parameters. 
+ * The cell value type of the interpreter is <T>.
  * During some computations like sum, average, the return type can be different
  * than the cell value data type, for eg, sum of int cell values might overflow
  * in case of a int result, we should use Long for its result. Therefore, this
@@ -44,12 +47,19 @@ import com.google.protobuf.ByteString;
  * <S>. There is a conversion method
  * {@link ColumnInterpreter#castToReturnType(Object)} which takes a <T> type and
  * returns a <S> type.
+ * The {@link AggregateImplementation} uses PB messages to initialize the 
+ * user's ColumnInterpreter implementation, and for sending the responses
+ * back to {@link AggregationClient}.
  * @param <T> Cell value data type
  * @param <S> Promoted data type
+ * @param <P> PB message that is used to transport initializer specific bytes
+ * @param <Q> PB message that is used to transport Cell (<T>) instance
+ * @param <R> PB message that is used to transport Promoted (<S>) instance
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public interface ColumnInterpreter<T, S> {
+public abstract class ColumnInterpreter<T, S, P extends Message, 
+Q extends Message, R extends Message> {
 
   /**
    * @param colFamily
@@ -58,7 +68,7 @@ public interface ColumnInterpreter<T, S>
    * @return value of type T
    * @throws IOException
    */
-  T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
+  public abstract T getValue(byte[] colFamily, byte[] colQualifier, KeyValue kv)
       throws IOException;
 
   /**
@@ -67,36 +77,36 @@ public interface ColumnInterpreter<T, S>
    * @return sum or non null value among (if either of them is null); otherwise
    * returns a null.
    */
-  public S add(S l1, S l2);
+  public abstract S add(S l1, S l2);
 
   /**
    * returns the maximum value for this type T
    * @return max
    */
 
-  T getMaxValue();
+  public abstract T getMaxValue();
 
-  T getMinValue();
+  public abstract T getMinValue();
 
   /**
    * @param o1
    * @param o2
    * @return multiplication
    */
-  S multiply(S o1, S o2);
+  public abstract S multiply(S o1, S o2);
 
   /**
    * @param o
    * @return increment
    */
-  S increment(S o);
+  public abstract S increment(S o);
 
   /**
    * provides casting opportunity between the data types.
    * @param o
    * @return cast
    */
-  S castToReturnType(T o);
+  public abstract S castToReturnType(T o);
 
   /**
    * This takes care if either of arguments are null. returns 0 if they are
@@ -105,7 +115,7 @@ public interface ColumnInterpreter<T, S>
    * <li>>0 if l1 > l2 or l1 is not null and l2 is null.
    * <li>< 0 if l1 < l2 or l1 is null and l2 is not null.
    */
-  int compare(final T l1, final T l2);
+  public abstract int compare(final T l1, final T l2);
 
   /**
    * used for computing average of <S> data values. Not providing the divide
@@ -114,51 +124,58 @@ public interface ColumnInterpreter<T, S>
    * @param l
    * @return Average
    */
-  double divideForAvg(S o, Long l);
+  public abstract double divideForAvg(S o, Long l);
 
   /**
    * This method should return any additional data that is needed on the
    * server side to construct the ColumnInterpreter. The server
-   * will pass this to the {@link #initialize(ByteString)}
+   * will pass this to the {@link #initialize}
    * method. If there is no ColumnInterpreter specific data (for e.g.,
    * {@link LongColumnInterpreter}) then null should be returned.
    * @return the PB message
    */
-  ByteString columnInterpreterSpecificData();
+  public abstract P getRequestData();
 
   /**
-   * Return the PB for type T
+   * This method should initialize any field(s) of the ColumnInterpreter with
+   * a parsing of the passed message bytes (used on the server side).
+   * @param msg
+   */
+  public abstract void initialize(P msg);
+  
+  /**
+   * This method gets the PB message corresponding to the cell type
    * @param t
-   * @return PB-message
+   * @return the PB message for the cell-type instance
    */
-  ByteString getProtoForCellType(T t);
+  public abstract Q getProtoForCellType(T t);
 
   /**
-   * Return the PB for type S
-   * @param s
-   * @return PB-message
+   * This method gets the PB message corresponding to the cell type
+   * @param q
+   * @return the cell-type instance from the PB message
    */
-  ByteString getProtoForPromotedType(S s);
+  public abstract T getCellValueFromProto(Q q);
 
   /**
-   * This method should initialize any field(s) of the ColumnInterpreter with
-   * a parsing of the passed message bytes (used on the server side).
-   * @param bytes
+   * This method gets the PB message corresponding to the promoted type
+   * @param s
+   * @return the PB message for the promoted-type instance
    */
-  void initialize(ByteString bytes);
-  
+  public abstract R getProtoForPromotedType(S s);
+
   /**
-   * Converts the bytes in the server's response to the expected type S
-   * @param response
-   * @return response of type S constructed from the message
+   * This method gets the promoted type from the proto message
+   * @param r
+   * @return the promoted-type instance from the PB message
    */
-  S parseResponseAsPromotedType(byte[] response);
-  
+  public abstract S getPromotedValueFromProto(R r);
+
   /**
    * The response message comes as type S. This will convert/cast it to T.
    * In some sense, performs the opposite of {@link #castToReturnType(Object)}
    * @param response
    * @return cast
    */
-  T castToCellType(S response);
+  public abstract T castToCellType(S response);
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java Sat Jan  5 13:22:04 2013
@@ -22,7 +22,10 @@ import static org.apache.hadoop.hbase.pr
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -1836,4 +1839,39 @@ public final class ProtobufUtil {
       KeyValue.Type.codeToType((byte)kv.getKeyType().getNumber()),
       kv.getValue().toByteArray());
   }
+
+  /**
+   * Get an instance of the argument type declared in a class's signature. The
+   * argument type is assumed to be a PB Message subclass, and the instance is
+   * created using parseFrom method on the passed ByteString.
+   * @param runtimeClass the runtime type of the class
+   * @param position the position of the argument in the class declaration
+   * @param b the ByteString which should be parsed to get the instance created
+   * @return the instance
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T extends Message> 
+  T getParsedGenericInstance(Class<?> runtimeClass, int position, ByteString b) 
+      throws IOException {
+    Type type = runtimeClass.getGenericSuperclass();
+    Type argType = ((ParameterizedType)type).getActualTypeArguments()[position];
+    Class<T> classType = (Class<T>)argType;
+    T inst;
+    try {
+      Method m = classType.getMethod("parseFrom", ByteString.class);
+      inst = (T)m.invoke(null, b);
+      return inst;
+    } catch (SecurityException e) {
+      throw new IOException(e);
+    } catch (NoSuchMethodException e) {
+      throw new IOException(e);
+    } catch (IllegalArgumentException e) {
+      throw new IOException(e);
+    } catch (InvocationTargetException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java?rev=1429278&r1=1429277&r2=1429278&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java Sat Jan  5 13:22:04 2013
@@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.LongMsg;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -131,7 +133,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci = 
+        new LongColumnInterpreter();
     long median = aClient.median(TEST_TABLE, ci,
         scan);
     assertEquals(8L, median);
@@ -153,7 +156,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[2]);
     scan.setStopRow(ROWS[14]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
     assertEquals(12, rowCount);
   }
@@ -168,7 +172,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
         scan);
     assertEquals(ROWSIZE, rowCount);
@@ -187,7 +192,8 @@ public class TestAggregateProtocol {
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[2]);
 
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -211,7 +217,8 @@ public class TestAggregateProtocol {
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[5]);
 
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -230,7 +237,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = -1;
     try {
       rowCount = aClient.rowCount(TEST_TABLE, ci, scan);
@@ -245,7 +253,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
         scan);
     assertEquals(20, rowCount);
@@ -256,7 +265,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
     long rowCount = aClient.rowCount(TEST_TABLE, ci,
@@ -277,7 +287,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long maximum = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(19, maximum);
   }
@@ -292,7 +303,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(14, max);
   }
@@ -302,7 +314,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long maximum = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(190, maximum);
   }
@@ -314,7 +327,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(60, max);
   }
@@ -322,7 +336,8 @@ public class TestAggregateProtocol {
   @Test
   public void testMaxWithValidRangeWithNullCF() {
     AggregationClient aClient = new AggregationClient(conf);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Scan scan = new Scan();
     Long max = null;
     try {
@@ -337,7 +352,8 @@ public class TestAggregateProtocol {
   @Test
   public void testMaxWithInvalidRange() {
     AggregationClient aClient = new AggregationClient(conf);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Scan scan = new Scan();
     scan.setStartRow(ROWS[4]);
     scan.setStopRow(ROWS[2]);
@@ -360,7 +376,8 @@ public class TestAggregateProtocol {
     scan.setStopRow(ROWS[4]);
     try {
       AggregationClient aClient = new AggregationClient(conf);
-      final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+      final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+          new LongColumnInterpreter();
       max = aClient.max(TEST_TABLE, ci, scan);
     } catch (Exception e) {
       max = 0;
@@ -376,7 +393,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     max = aClient.max(TEST_TABLE, ci, scan);
     assertEquals(null, max);
   }
@@ -395,7 +413,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(HConstants.EMPTY_START_ROW);
     scan.setStopRow(HConstants.EMPTY_END_ROW);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = aClient.min(TEST_TABLE, ci,
         scan);
     assertEquals(0l, min.longValue());
@@ -411,7 +430,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(5, min);
   }
@@ -423,7 +443,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(HConstants.EMPTY_START_ROW);
     scan.setStopRow(HConstants.EMPTY_END_ROW);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci,
         scan);
     assertEquals(0, min);
@@ -436,7 +457,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(6, min);
   }
@@ -447,7 +469,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
@@ -465,7 +488,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[4]);
     scan.setStopRow(ROWS[2]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
     } catch (Throwable e) {
@@ -480,7 +504,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[6]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     try {
       min = aClient.min(TEST_TABLE, ci, scan);
@@ -496,7 +521,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY, TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long min = null;
     min = aClient.min(TEST_TABLE, ci, scan);
     assertEquals(null, min);
@@ -513,7 +539,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci,
         scan);
     assertEquals(190, sum);
@@ -529,7 +556,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(95, sum);
   }
@@ -539,7 +567,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci,
         scan);
     assertEquals(190 + 1900, sum);
@@ -552,7 +581,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     long sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(6 + 60, sum);
   }
@@ -563,7 +593,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     try {
       sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -580,7 +611,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[2]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     try {
       sum = aClient.sum(TEST_TABLE, ci, scan);
@@ -596,7 +628,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Long sum = null;
     sum = aClient.sum(TEST_TABLE, ci, scan);
     assertEquals(null, sum);
@@ -613,7 +646,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci,
         scan);
     assertEquals(9.5, avg, 0);
@@ -629,7 +663,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(9.5, avg, 0);
   }
@@ -639,7 +674,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci,
         scan);
     assertEquals(104.5, avg, 0);
@@ -652,7 +688,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(6 + 60, avg, 0);
   }
@@ -661,7 +698,8 @@ public class TestAggregateProtocol {
   public void testAvgWithValidRangeWithNullCF() {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     try {
       avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -678,7 +716,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[1]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     try {
       avg = aClient.avg(TEST_TABLE, ci, scan);
@@ -694,7 +733,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     Filter f = new PrefixFilter(Bytes.toBytes("foo:bar"));
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double avg = null;
     avg = aClient.avg(TEST_TABLE, ci, scan);
     assertEquals(Double.NaN, avg, 0);
@@ -711,7 +751,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci,
         scan);
     assertEquals(5.766, std, 0.05d);
@@ -727,7 +768,8 @@ public class TestAggregateProtocol {
     scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
     scan.setStartRow(ROWS[5]);
     scan.setStopRow(ROWS[15]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(2.87, std, 0.05d);
   }
@@ -737,7 +779,8 @@ public class TestAggregateProtocol {
     AggregationClient aClient = new AggregationClient(conf);
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci,
         scan);
     assertEquals(63.42, std, 0.05d);
@@ -750,7 +793,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[7]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     double std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(0, std, 0);
   }
@@ -761,7 +805,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[17]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     try {
       std = aClient.std(TEST_TABLE, ci, scan);
@@ -778,7 +823,8 @@ public class TestAggregateProtocol {
     scan.addFamily(TEST_FAMILY);
     scan.setStartRow(ROWS[6]);
     scan.setStopRow(ROWS[1]);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     try {
       std = aClient.std(TEST_TABLE, ci, scan);
@@ -794,7 +840,8 @@ public class TestAggregateProtocol {
     Scan scan = new Scan();
     scan.addFamily(TEST_FAMILY);
     scan.setFilter(f);
-    final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
+    final ColumnInterpreter<Long, Long, EmptyMsg, LongMsg, LongMsg> ci =
+        new LongColumnInterpreter();
     Double std = null;
     std = aClient.std(TEST_TABLE, ci, scan);
     assertEquals(Double.NaN, std, 0);