You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/03 14:50:24 UTC

svn commit: r1393466 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift: ThriftResultSerializer.java ThriftUtilities.java

Author: mbautin
Date: Wed Oct  3 12:50:24 2012
New Revision: 1393466

URL: http://svn.apache.org/viewvc?rev=1393466&view=rev
Log:
[jira] [HBASE-6909] [89-fb] Thrift serializer for TableInputFormat

Author: mbautin

Summary: We want to be able to serialize TableInputFormat's results of scanning an HBase table using Thrift and pipe that to a streaming mapper written in C++. This goes together with map-reduce changes in D580952.

Test Plan:
- Run a streaming job:
~/*MR/bin/hadoop jar ~/*MR/contrib/streaming/*streaming*.jar -D hbase.mapred.tablecolumns=input_cf -D stream.map.input.ignoreKey=true -D stream.map.input.ignoreNewLine=true -D stream.map.input.serializer.class=org.apache.hadoop.hbase.thrift.ThriftResultSerializer -D hbase.thrift.result.serializer.protocol.class=org.apache.thrift.protocol.TJSONProtocol -D mapred.reduce.tasks=0  -input input_table_name -mapper cat -inputformat org.apache.hadoop.hbase.mapred.TableInputFormat -output /user/mbautin/test_output

- Examine the output and make sure it is JSON-serialized, with no separate key field and no line-end characters.

Reviewers: kranganathan, nzhang, rvadali, liyintang

Reviewed By: liyintang

CC: hbase-eng@, davejwatson

Differential Revision: https://phabricator.fb.com/D581920

Task ID: 1735916

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java?rev=1393466&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java Wed Oct  3 12:50:24 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * A serializer for use with {@link TableInputFormat}. Serializes results as TResult.
+ */
+public class ThriftResultSerializer implements Serializer<Object>, Configurable {
+
+  private static final Log LOG = LogFactory.getLog(ThriftResultSerializer.class);
+
+  public static final String PROTOCOL_CONF_KEY = "hbase.thrift.result.serializer.protocol.class";
+  
+  private OutputStream out;
+  
+  @SuppressWarnings("rawtypes")
+  private Class protocolClass = TCompactProtocol.class;
+
+  private TProtocol prot;
+  private DataOutput dataOut;
+  private TTransport transport;
+
+  private Configuration conf;
+
+  @Override
+  public void open(OutputStream out) throws IOException {
+    this.out = out;
+    transport = new TIOStreamTransport(out);
+
+    LOG.info("Using Thrift protocol: " + protocolClass.getName());
+    
+    try {
+      Constructor<TProtocol> constructor =
+          protocolClass.getConstructor(new Class[] { TTransport.class });
+      prot = constructor.newInstance(transport);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+    
+    if (out instanceof DataOutput) {
+      dataOut = (DataOutput) out;
+    } else {
+      dataOut = new DataOutputStream(out);
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void serialize(Object t) throws IOException {
+    Class klass = t.getClass();
+    if (klass == Result.class) {
+      Result result = (Result) t;
+      TRowResult tResult = ThriftUtilities.oneRowResult(result);
+      try {
+        tResult.write(prot);
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    } else if (klass == ImmutableBytesWritable.class) {
+      // This is used for the row.
+      ImmutableBytesWritable imb = (ImmutableBytesWritable) t;
+      imb.write(dataOut);
+    } else {
+      throw new IOException("Cannot serialize class " + klass.getName());
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    out.close();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    protocolClass = conf.getClass(PROTOCOL_CONF_KEY, protocolClass);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java?rev=1393466&r1=1393465&r2=1393466&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java Wed Oct  3 12:50:24 2012
@@ -137,19 +137,23 @@ public class ThriftUtilities {
       if(result_ == null || result_.isEmpty()) {
         continue;
       }
-      TRowResult result = new TRowResult();
-      result.row = ByteBuffer.wrap(result_.getRow());
-      // No reason to use TreeMap because this will become a HashMap on the client side anyway.
-      result.columns = new HashMap<ByteBuffer, TCell>();
-      for(KeyValue kv : result_.raw()) {
-        result.columns.put(ByteBuffer.wrap(kv.makeColumn()),
-            new TCell(ByteBuffer.wrap(kv.getValue()), kv.getTimestamp()));
-      }
-      results.add(result);
+      results.add(oneRowResult(result_));
     }
     return results;
   }
 
+  static TRowResult oneRowResult(Result result_) {
+    TRowResult result = new TRowResult();
+    result.row = ByteBuffer.wrap(result_.getRow());
+    // No reason to use TreeMap because this will become a HashMap on the client side anyway.
+    result.columns = new HashMap<ByteBuffer, TCell>();
+    for(KeyValue kv : result_.raw()) {
+      result.columns.put(ByteBuffer.wrap(kv.makeColumn()),
+          new TCell(ByteBuffer.wrap(kv.getValue()), kv.getTimestamp()));
+    }
+    return result;
+  }
+
   static public List<TRowResult> rowResultFromHBase(Result in) {
     Result [] result = { in };
     return rowResultFromHBase(result);