You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/10/03 14:50:24 UTC
svn commit: r1393466 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift:
ThriftResultSerializer.java ThriftUtilities.java
Author: mbautin
Date: Wed Oct 3 12:50:24 2012
New Revision: 1393466
URL: http://svn.apache.org/viewvc?rev=1393466&view=rev
Log:
[jira] [HBASE-6909] [89-fb] Thrift serializer for TableInputFormat
Author: mbautin
Summary: We want to be able to serialize TableInputFormat's results of scanning an HBase table using Thrift and pipe that to a streaming mapper written in C++. This goes together with map-reduce changes in D580952.
Test Plan:
- Run a streaming job:
~/*MR/bin/hadoop jar ~/*MR/contrib/streaming/*streaming*.jar -D hbase.mapred.tablecolumns=input_cf -D stream.map.input.ignoreKey=true -D stream.map.input.ignoreNewLine=true -D stream.map.input.serializer.class=org.apache.hadoop.hbase.thrift.ThriftResultSerializer -D hbase.thrift.result.serializer.protocol.class=org.apache.thrift.protocol.TJSONProtocol -D mapred.reduce.tasks=0 -input input_table_name -mapper cat -inputformat org.apache.hadoop.hbase.mapred.TableInputFormat -output /user/mbautin/test_output
- Examine the output and make sure it is JSON-serialized, with no separate key field and no line-end characters.
Reviewers: kranganathan, nzhang, rvadali, liyintang
Reviewed By: liyintang
CC: hbase-eng@, davejwatson
Differential Revision: https://phabricator.fb.com/D581920
Task ID: 1735916
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java?rev=1393466&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftResultSerializer.java Wed Oct 3 12:50:24 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * A serializer for use with {@link TableInputFormat}. Serializes results as TResult.
+ */
+public class ThriftResultSerializer implements Serializer<Object>, Configurable {
+
+ private static final Log LOG = LogFactory.getLog(ThriftResultSerializer.class);
+
+ public static final String PROTOCOL_CONF_KEY = "hbase.thrift.result.serializer.protocol.class";
+
+ private OutputStream out;
+
+ @SuppressWarnings("rawtypes")
+ private Class protocolClass = TCompactProtocol.class;
+
+ private TProtocol prot;
+ private DataOutput dataOut;
+ private TTransport transport;
+
+ private Configuration conf;
+
+ @Override
+ public void open(OutputStream out) throws IOException {
+ this.out = out;
+ transport = new TIOStreamTransport(out);
+
+ LOG.info("Using Thrift protocol: " + protocolClass.getName());
+
+ try {
+ Constructor<TProtocol> constructor =
+ protocolClass.getConstructor(new Class[] { TTransport.class });
+ prot = constructor.newInstance(transport);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ if (out instanceof DataOutput) {
+ dataOut = (DataOutput) out;
+ } else {
+ dataOut = new DataOutputStream(out);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void serialize(Object t) throws IOException {
+ Class klass = t.getClass();
+ if (klass == Result.class) {
+ Result result = (Result) t;
+ TRowResult tResult = ThriftUtilities.oneRowResult(result);
+ try {
+ tResult.write(prot);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ } else if (klass == ImmutableBytesWritable.class) {
+ // This is used for the row.
+ ImmutableBytesWritable imb = (ImmutableBytesWritable) t;
+ imb.write(dataOut);
+ } else {
+ throw new IOException("Cannot serialize class " + klass.getName());
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ protocolClass = conf.getClass(PROTOCOL_CONF_KEY, protocolClass);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java?rev=1393466&r1=1393465&r2=1393466&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java Wed Oct 3 12:50:24 2012
@@ -137,19 +137,23 @@ public class ThriftUtilities {
if(result_ == null || result_.isEmpty()) {
continue;
}
- TRowResult result = new TRowResult();
- result.row = ByteBuffer.wrap(result_.getRow());
- // No reason to use TreeMap because this will become a HashMap on the client side anyway.
- result.columns = new HashMap<ByteBuffer, TCell>();
- for(KeyValue kv : result_.raw()) {
- result.columns.put(ByteBuffer.wrap(kv.makeColumn()),
- new TCell(ByteBuffer.wrap(kv.getValue()), kv.getTimestamp()));
- }
- results.add(result);
+ results.add(oneRowResult(result_));
}
return results;
}
+ static TRowResult oneRowResult(Result result_) {
+ TRowResult result = new TRowResult();
+ result.row = ByteBuffer.wrap(result_.getRow());
+ // No reason to use TreeMap because this will become a HashMap on the client side anyway.
+ result.columns = new HashMap<ByteBuffer, TCell>();
+ for(KeyValue kv : result_.raw()) {
+ result.columns.put(ByteBuffer.wrap(kv.makeColumn()),
+ new TCell(ByteBuffer.wrap(kv.getValue()), kv.getTimestamp()));
+ }
+ return result;
+ }
+
static public List<TRowResult> rowResultFromHBase(Result in) {
Result [] result = { in };
return rowResultFromHBase(result);