You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2013/11/13 17:47:12 UTC
svn commit: r1541609 [3/4] - in /hive/trunk: ./ cli/ hbase-handler/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/org/apache/hadoop/hive/hbase/
hbase-handler/src/test/queries/negative/ hcatalog/
hcatalog/build-support/ant/...
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.hbase.PutWritable;
import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
@@ -37,9 +38,12 @@ import org.apache.hcatalog.common.HCatCo
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
-public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Put>,
- HiveOutputFormat<WritableComparable<?>, Put> {
-
+/**
+ * Children of this output format can be passed either Put's (typical) or
+ * PutWritable. PutWritables will come from Hive's HBase SerDe.
+ */
+public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Object>,
+ HiveOutputFormat<WritableComparable<?>, Object> {
@Override
public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath,
@@ -51,22 +55,34 @@ public class HBaseBaseOutputFormat imple
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+ OutputFormat<WritableComparable<?>, Object> outputFormat = getOutputFormat(job);
outputFormat.checkOutputSpecs(ignored, job);
}
@Override
- public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+ public RecordWriter<WritableComparable<?>, Object> getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress) throws IOException {
- OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+ HBaseHCatStorageHandler.setHBaseSerializers(job);
+ OutputFormat<WritableComparable<?>, Object> outputFormat = getOutputFormat(job);
return outputFormat.getRecordWriter(ignored, job, name, progress);
}
- private OutputFormat<WritableComparable<?>, Put> getOutputFormat(JobConf job)
+ protected static Put toPut(Object o) {
+ if(o != null) {
+ if(o instanceof Put) {
+ return (Put)o;
+ } else if(o instanceof PutWritable) {
+ return ((PutWritable)o).getPut();
+ }
+ }
+ throw new IllegalArgumentException("Illegal Argument " + (o == null ? "null" : o.getClass().getName()));
+ }
+
+ private OutputFormat<WritableComparable<?>, Object> getOutputFormat(JobConf job)
throws IOException {
String outputInfo = job.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo);
- OutputFormat<WritableComparable<?>, Put> outputFormat = null;
+ OutputFormat<WritableComparable<?>, Object> outputFormat = null;
if (HBaseHCatStorageHandler.isBulkMode(outputJobInfo)) {
outputFormat = new HBaseBulkOutputFormat();
} else {
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -53,10 +54,10 @@ class HBaseBulkOutputFormat extends HBas
private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
new byte[0]);
- private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
+ private SequenceFileOutputFormat<WritableComparable<?>, Object> baseOutputFormat;
public HBaseBulkOutputFormat() {
- baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
+ baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Object>();
}
@Override
@@ -68,9 +69,10 @@ class HBaseBulkOutputFormat extends HBas
}
@Override
- public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+ public RecordWriter<WritableComparable<?>, Object> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
+ HBaseHCatStorageHandler.setHBaseSerializers(job);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
long version = HBaseRevisionManagerUtil.getOutputRevision(job);
@@ -93,26 +95,28 @@ class HBaseBulkOutputFormat extends HBas
}
private static class HBaseBulkRecordWriter implements
- RecordWriter<WritableComparable<?>, Put> {
+ RecordWriter<WritableComparable<?>, Object> {
- private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private RecordWriter<WritableComparable<?>, Object> baseWriter;
private final Long outputVersion;
public HBaseBulkRecordWriter(
- RecordWriter<WritableComparable<?>, Put> baseWriter,
+ RecordWriter<WritableComparable<?>, Object> baseWriter,
Long outputVersion) {
this.baseWriter = baseWriter;
this.outputVersion = outputVersion;
}
@Override
- public void write(WritableComparable<?> key, Put value)
+ public void write(WritableComparable<?> key, Object value)
throws IOException {
- Put put = value;
+ Put original = toPut(value);
+ Put put = original;
if (outputVersion != null) {
- put = new Put(value.getRow(), outputVersion.longValue());
- for (List<KeyValue> row : value.getFamilyMap().values()) {
- for (KeyValue el : row) {
+ put = new Put(original.getRow(), outputVersion.longValue());
+ for (List<? extends Cell> row : original.getFamilyMap().values()) {
+ for (Cell cell : row) {
+ KeyValue el = (KeyValue)cell;
put.add(el.getFamily(), el.getQualifier(), el.getValue());
}
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
@@ -53,9 +54,10 @@ class HBaseDirectOutputFormat extends HB
}
@Override
- public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+ public RecordWriter<WritableComparable<?>, Object> getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress)
throws IOException {
+ HBaseHCatStorageHandler.setHBaseSerializers(job);
long version = HBaseRevisionManagerUtil.getOutputRevision(job);
return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
progress), version);
@@ -69,26 +71,28 @@ class HBaseDirectOutputFormat extends HB
}
private static class HBaseDirectRecordWriter implements
- RecordWriter<WritableComparable<?>, Put> {
+ RecordWriter<WritableComparable<?>, Object> {
- private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private RecordWriter<WritableComparable<?>, Object> baseWriter;
private final Long outputVersion;
public HBaseDirectRecordWriter(
- RecordWriter<WritableComparable<?>, Put> baseWriter,
+ RecordWriter<WritableComparable<?>, Object> baseWriter,
Long outputVersion) {
this.baseWriter = baseWriter;
this.outputVersion = outputVersion;
}
@Override
- public void write(WritableComparable<?> key, Put value)
+ public void write(WritableComparable<?> key, Object value)
throws IOException {
- Put put = value;
+ Put original = toPut(value);
+ Put put = original;
if (outputVersion != null) {
- put = new Put(value.getRow(), outputVersion.longValue());
- for (List<KeyValue> row : value.getFamilyMap().values()) {
- for (KeyValue el : row) {
+ put = new Put(original.getRow(), outputVersion.longValue());
+ for (List<? extends Cell> row : original.getFamilyMap().values()) {
+ for (Cell cell : row) {
+ KeyValue el = (KeyValue)cell;
put.add(el.getFamily(), el.getQualifier(), el.getValue());
}
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Wed Nov 13 16:47:11 2013
@@ -33,11 +33,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization;
+import org.apache.hadoop.hbase.mapreduce.MutationSerialization;
+import org.apache.hadoop.hbase.mapreduce.ResultSerialization;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -87,6 +89,7 @@ public class HBaseHCatStorageHandler ext
public final static String DEFAULT_PREFIX = "default.";
private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
+ private final static String IO_SERIALIZATIONS = "io.serializations";
private Configuration hbaseConf;
private Configuration jobConf;
@@ -135,7 +138,7 @@ public class HBaseHCatStorageHandler ext
//TODO: Remove when HCAT-308 is fixed
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
+ setHBaseSerializers(jobProperties);
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job properties", e);
}
@@ -191,7 +194,7 @@ public class HBaseHCatStorageHandler ext
jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
addOutputDependencyJars(jobConf);
jobProperties.put("tmpjars", jobConf.get("tmpjars"));
-
+ setHBaseSerializers(jobProperties);
} catch (IOException e) {
throw new IllegalStateException("Error while configuring job properties", e);
}
@@ -339,12 +342,8 @@ public class HBaseHCatStorageHandler ext
RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
rm.createTable(tableName, new ArrayList<String>(uniqueColumnFamilies));
- } catch (MasterNotRunningException mnre) {
- throw new MetaException(StringUtils.stringifyException(mnre));
- } catch (IOException ie) {
- throw new MetaException(StringUtils.stringifyException(ie));
- } catch (IllegalArgumentException iae) {
- throw new MetaException(StringUtils.stringifyException(iae));
+ } catch (Exception e) {
+ throw new MetaException(StringUtils.stringifyException(e));
}
}
@@ -406,10 +405,8 @@ public class HBaseHCatStorageHandler ext
admin = new HBaseAdmin(this.getConf());
}
return admin;
- } catch (MasterNotRunningException mnre) {
- throw new MetaException(StringUtils.stringifyException(mnre));
- } catch (ZooKeeperConnectionException zkce) {
- throw new MetaException(StringUtils.stringifyException(zkce));
+ } catch (Exception e) {
+ throw new MetaException(StringUtils.stringifyException(e));
}
}
@@ -544,8 +541,10 @@ public class HBaseHCatStorageHandler ext
TableMapReduceUtil.addDependencyJars(conf,
//ZK
ZooKeeper.class,
- //HBase
+ //HBase Client
HTable.class,
+ //HBase MapReduce
+ MutationSerialization.class,
//Hive
HiveException.class,
//HCatalog jar
@@ -634,4 +633,20 @@ public class HBaseHCatStorageHandler ext
return builder.toString();
}
+ static void setHBaseSerializers(Configuration configuration) {
+ configuration.setStrings(IO_SERIALIZATIONS, configuration.get(IO_SERIALIZATIONS),
+ MutationSerialization.class.getName(), ResultSerialization.class.getName(),
+ KeyValueSerialization.class.getName());
+ }
+ static void setHBaseSerializers(Map<String, String> configuration) {
+ String value = "";
+ if(configuration.containsKey(IO_SERIALIZATIONS)) {
+ value = configuration.get(IO_SERIALIZATIONS) + ",";
+ } else {
+ value = new Configuration().get(IO_SERIALIZATIONS, WritableSerialization.class.getName()) + ",";
+ }
+ value += MutationSerialization.class.getName() + "," + ResultSerialization.class.getName()
+ + "," + KeyValueSerialization.class.getName();
+ configuration.put(IO_SERIALIZATIONS, value);
+ }
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Wed Nov 13 16:47:11 2013
@@ -23,11 +23,11 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.hbase.ResultWritable;
import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -41,7 +41,7 @@ import org.apache.hcatalog.mapreduce.Inp
/**
* This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
*/
-class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, ResultWritable> {
private final TableInputFormat inputFormat;
@@ -66,7 +66,7 @@ class HBaseInputFormat implements InputF
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ public RecordReader<ImmutableBytesWritable, ResultWritable> getRecordReader(
InputSplit split, JobConf job, Reporter reporter)
throws IOException {
String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Wed Nov 13 16:47:11 2013
@@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hive.hbase.ResultWritable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
@@ -50,7 +49,7 @@ import org.slf4j.LoggerFactory;
* The Class HbaseSnapshotRecordReader implements logic for filtering records
* based on snapshot.
*/
-class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, ResultWritable> {
static final Logger LOG = LoggerFactory.getLogger(HbaseSnapshotRecordReader.class);
private final InputJobInfo inpJobInfo;
@@ -62,8 +61,6 @@ class HbaseSnapshotRecordReader implemen
private TableSnapshot snapshot;
private Iterator<Result> resultItr;
private Set<Long> allAbortedTransactions;
- private DataOutputBuffer valueOut = new DataOutputBuffer();
- private DataInputBuffer valueIn = new DataInputBuffer();
HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
this.inpJobInfo = inputJobInfo;
@@ -152,8 +149,8 @@ class HbaseSnapshotRecordReader implemen
}
@Override
- public Result createValue() {
- return new Result();
+ public ResultWritable createValue() {
+ return new ResultWritable();
}
@Override
@@ -170,7 +167,7 @@ class HbaseSnapshotRecordReader implemen
}
@Override
- public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
+ public boolean next(ImmutableBytesWritable key, ResultWritable value) throws IOException {
if (this.resultItr == null) {
LOG.warn("The HBase result iterator is found null. It is possible"
+ " that the record reader has already been closed.");
@@ -182,10 +179,7 @@ class HbaseSnapshotRecordReader implemen
// Update key and value. Currently no way to avoid serialization/de-serialization
// as no setters are available.
key.set(hbaseRow.getRow());
- valueOut.reset();
- hbaseRow.write(valueOut);
- valueIn.reset(valueOut.getData(), valueOut.getLength());
- value.readFields(valueIn);
+ value.setResult(hbaseRow);
return true;
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Wed Nov 13 16:47:11 2013
@@ -19,23 +19,22 @@
package org.apache.hcatalog.hbase;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
-import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
-
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
@@ -47,11 +46,10 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hive.hcatalog.mapreduce.HCatMapRedUtil;
+import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH;
-
/**
* MapReduce job which reads a series of Puts stored in a sequence file
@@ -71,12 +69,8 @@ class ImportSequenceFile {
@Override
public void map(ImmutableBytesWritable rowKey, Put value,
Context context)
- throws IOException {
- try {
- context.write(new ImmutableBytesWritable(value.getRow()), value);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ throws IOException, InterruptedException {
+ context.write(new ImmutableBytesWritable(value.getRow()), value);
}
}
@@ -158,6 +152,7 @@ class ImportSequenceFile {
private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
throws IOException {
+ HBaseHCatStorageHandler.setHBaseSerializers(conf);
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(SequenceFileImporter.class);
FileInputFormat.setInputPaths(job, inputDir);
@@ -170,18 +165,35 @@ class ImportSequenceFile {
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HFileOutputFormat.configureIncrementalLoad(job, table);
+ URI partitionURI;
+ try {
+ partitionURI = new URI(TotalOrderPartitioner.getPartitionFile(job.getConfiguration())
+ + "#" + TotalOrderPartitioner.DEFAULT_PATH);
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ DistributedCache.addCacheFile(partitionURI, job.getConfiguration());
+ DistributedCache.createSymlink(job.getConfiguration());
//override OutputFormatClass with our own so we can include cleanup in the committer
job.setOutputFormatClass(ImporterOutputFormat.class);
//local mode doesn't support symbolic links so we have to manually set the actual path
if (localMode) {
String partitionFile = null;
- for (URI uri : DistributedCache.getCacheFiles(job.getConfiguration())) {
- if (DEFAULT_PATH.equals(uri.getFragment())) {
+ URI[] uris = DistributedCache.getCacheFiles(job.getConfiguration());
+ if(uris == null) {
+ throw new IllegalStateException("No cache file existed in job configuration");
+ }
+ for (URI uri : uris) {
+ if (TotalOrderPartitioner.DEFAULT_PATH.equals(uri.getFragment())) {
partitionFile = uri.toString();
break;
}
}
+ if(partitionFile == null) {
+ throw new IllegalStateException("Unable to find " +
+ TotalOrderPartitioner.DEFAULT_PATH + " in cache");
+ }
partitionFile = partitionFile.substring(0, partitionFile.lastIndexOf("#"));
job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH, partitionFile.toString());
}
Added: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java?rev=1541609&view=auto
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java (added)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RPCConverter.java Wed Nov 13 16:47:11 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RPCConverter {
+
+ List<FamilyRevision> convertFamilyRevisions(List<RevisionManagerEndpointProtos.FamilyRevision> revisions) {
+ List<FamilyRevision> result = new ArrayList<FamilyRevision>();
+ for(RevisionManagerEndpointProtos.FamilyRevision revision : revisions) {
+ result.add(new FamilyRevision(revision.getRevision(), revision.getTimestamp()));
+ }
+ return result;
+ }
+ RevisionManagerEndpointProtos.TableSnapshot convertTableSnapshot(TableSnapshot tableSnapshot) {
+ RevisionManagerEndpointProtos.TableSnapshot.Builder builder =
+ RevisionManagerEndpointProtos.TableSnapshot.newBuilder()
+ .setTableName(tableSnapshot.getTableName())
+ .setLatestRevision(tableSnapshot.getLatestRevision());
+ Map<String, Long> cfRevisionMap = tableSnapshot.getColumnFamilyRevisionMap();
+ for(Map.Entry<String, Long> entry : cfRevisionMap.entrySet()) {
+ builder.addColumnFamilyRevision(RevisionManagerEndpointProtos.TableSnapshot.
+ ColumnFamilyRevision.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
+
+ TableSnapshot convertTableSnapshot(RevisionManagerEndpointProtos.TableSnapshot tableSnapshot) {
+ Map<String, Long> columnFamilyRevisions = new HashMap<String, Long>();
+ for(RevisionManagerEndpointProtos.TableSnapshot.ColumnFamilyRevision rev : tableSnapshot.getColumnFamilyRevisionList()) {
+ columnFamilyRevisions.put(rev.getKey(), rev.getValue());
+ }
+ return new TableSnapshot(tableSnapshot.getTableName(), columnFamilyRevisions, tableSnapshot.getLatestRevision());
+ }
+
+ RevisionManagerEndpointProtos.Transaction convertTransaction(Transaction transaction) {
+ return RevisionManagerEndpointProtos.Transaction.newBuilder()
+ .setTableName(transaction.getTableName())
+ .addAllColumnFamilies(transaction.getColumnFamilies())
+ .setRevision(transaction.getRevisionNumber())
+ .setTimeStamp(transaction.getTimeStamp())
+ .setKeepAlive(transaction.getKeepAliveValue())
+ .build();
+ }
+ Transaction convertTransaction(RevisionManagerEndpointProtos.Transaction transaction) {
+ Transaction result = new Transaction(transaction.getTableName(), transaction.getColumnFamiliesList(), transaction.getRevision(), transaction.getTimeStamp());
+ result.setKeepAlive(transaction.getKeepAlive());
+ return result;
+ }
+}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Wed Nov 13 16:47:11 2013
@@ -93,7 +93,7 @@ public interface RevisionManager {
*/
@Deprecated
public Transaction beginWriteTransaction(String table,
- List<String> families, long keepAlive) throws IOException;
+ List<String> families, Long keepAlive) throws IOException;
/**
* Commit the write transaction.
@@ -146,7 +146,7 @@ public interface RevisionManager {
* @throws IOException
*/
@Deprecated
- public TableSnapshot createSnapshot(String tableName, long revision)
+ public TableSnapshot createSnapshot(String tableName, Long revision)
throws IOException;
/**
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Wed Nov 13 16:47:11 2013
@@ -19,14 +19,36 @@
package org.apache.hcatalog.hbase.snapshot;
import java.io.IOException;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.RevisionManagerEndpointService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
/**
* Implementation of RevisionManager as HBase RPC endpoint. This class will control the lifecycle of
* and delegate to the actual RevisionManager implementation and make it available as a service
@@ -34,21 +56,21 @@ import org.slf4j.LoggerFactory;
* In the case of {@link ZKBasedRevisionManager} now only the region servers need write access to
* manage revision data.
*/
-public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
+public class RevisionManagerEndpoint extends RevisionManagerEndpointService implements Coprocessor, CoprocessorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
+ private final RPCConverter rpcConverter = new RPCConverter();
private RevisionManager rmImpl = null;
@Override
public void start(CoprocessorEnvironment env) {
- super.start(env);
try {
Configuration conf = RevisionManagerConfiguration.create(env.getConfiguration());
String className = conf.get(RMConstants.REVISION_MGR_ENDPOINT_IMPL_CLASS,
ZKBasedRevisionManager.class.getName());
- LOGGER.debug("Using Revision Manager implementation: {}", className);
+ LOGGER.info("Using Revision Manager implementation: {}", className);
rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
} catch (IOException e) {
LOGGER.error("Failed to initialize revision manager", e);
@@ -57,85 +79,140 @@ public class RevisionManagerEndpoint ext
@Override
public void stop(CoprocessorEnvironment env) {
- if (rmImpl != null) {
- try {
+ try {
+ if (rmImpl != null) {
rmImpl.close();
- } catch (IOException e) {
- LOGGER.warn("Error closing revision manager.", e);
}
+ } catch (IOException e) {
+ LOGGER.warn("Error closing revision manager.", e);
}
- super.stop(env);
- }
-
- @Override
- public void initialize(Configuration conf) {
- // do nothing, HBase controls life cycle
- }
-
- @Override
- public void open() throws IOException {
- // do nothing, HBase controls life cycle
}
@Override
- public void close() throws IOException {
- // do nothing, HBase controls life cycle
+ public Service getService() {
+ return this;
}
@Override
- public void createTable(String table, List<String> columnFamilies) throws IOException {
- rmImpl.createTable(table, columnFamilies);
- }
-
- @Override
- public void dropTable(String table) throws IOException {
- rmImpl.dropTable(table);
- }
-
- @Override
- public Transaction beginWriteTransaction(String table, List<String> families)
- throws IOException {
- return rmImpl.beginWriteTransaction(table, families);
+ public void createTable(RpcController controller,
+ CreateTableRequest request, RpcCallback<CreateTableResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.createTable(request.getTableName(), request.getColumnFamiliesList());
+ done.run(CreateTableResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public Transaction beginWriteTransaction(String table,
- List<String> families, long keepAlive) throws IOException {
- return rmImpl.beginWriteTransaction(table, families, keepAlive);
+ public void dropTable(RpcController controller, DropTableRequest request,
+ RpcCallback<DropTableResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.dropTable(request.getTableName());
+ done.run(DropTableResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public void commitWriteTransaction(Transaction transaction)
- throws IOException {
- rmImpl.commitWriteTransaction(transaction);
+ public void beginWriteTransaction(RpcController controller,
+ BeginWriteTransactionRequest request,
+ RpcCallback<BeginWriteTransactionResponse> done) {
+ if(rmImpl != null) {
+ try {
+ Transaction transaction;
+ if(request.hasKeepAlive()) {
+ transaction = rmImpl.beginWriteTransaction(request.getTableName(), request.getColumnFamiliesList(),
+ request.getKeepAlive());
+ } else {
+ transaction = rmImpl.beginWriteTransaction(request.getTableName(), request.getColumnFamiliesList());
+ }
+ done.run(BeginWriteTransactionResponse.newBuilder()
+ .setTransaction(rpcConverter.convertTransaction(transaction)).build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public void abortWriteTransaction(Transaction transaction)
- throws IOException {
- rmImpl.abortWriteTransaction(transaction);
+ public void commitWriteTransaction(RpcController controller,
+ CommitWriteTransactionRequest request,
+ RpcCallback<CommitWriteTransactionResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.commitWriteTransaction(rpcConverter.convertTransaction(request.getTransaction()));
+ done.run(CommitWriteTransactionResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public TableSnapshot createSnapshot(String tableName) throws IOException {
- return rmImpl.createSnapshot(tableName);
+ public void abortWriteTransaction(RpcController controller,
+ AbortWriteTransactionRequest request,
+ RpcCallback<AbortWriteTransactionResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.abortWriteTransaction(rpcConverter.convertTransaction(request.getTransaction()));
+ done.run(AbortWriteTransactionResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public TableSnapshot createSnapshot(String tableName, long revision)
- throws IOException {
- return rmImpl.createSnapshot(tableName, revision);
+ public void getAbortedWriteTransactions(RpcController controller,
+ GetAbortedWriteTransactionsRequest request,
+ RpcCallback<GetAbortedWriteTransactionsResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.getAbortedWriteTransactions(request.getTableName(), request.getColumnFamily());
+ done.run(GetAbortedWriteTransactionsResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public void keepAlive(Transaction transaction) throws IOException {
- rmImpl.keepAlive(transaction);
+ public void createSnapshot(RpcController controller,
+ CreateSnapshotRequest request, RpcCallback<CreateSnapshotResponse> done) {
+ if(rmImpl != null) {
+ try {
+ TableSnapshot snapshot;
+ if(request.hasRevision()) {
+ snapshot = rmImpl.createSnapshot(request.getTableName(), request.getRevision());
+ } else {
+ snapshot = rmImpl.createSnapshot(request.getTableName());
+ }
+ done.run(CreateSnapshotResponse.newBuilder()
+ .setTableSnapshot(rpcConverter.convertTableSnapshot(snapshot)).build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
@Override
- public List<FamilyRevision> getAbortedWriteTransactions(String table,
- String columnFamily) throws IOException {
- return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+ public void keepAliveTransaction(RpcController controller,
+ KeepAliveTransactionRequest request,
+ RpcCallback<KeepAliveTransactionResponse> done) {
+ if(rmImpl != null) {
+ try {
+ rmImpl.keepAlive(rpcConverter.convertTransaction(request.getTransaction()));
+ done.run(KeepAliveTransactionResponse.newBuilder().build());
+ } catch(IOException e) {
+ ResponseConverter.setControllerException(controller, e);
+ }
+ }
}
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Wed Nov 13 16:47:11 2013
@@ -21,12 +21,33 @@ package org.apache.hcatalog.hbase.snapsh
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.AbortWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.BeginWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CommitWriteTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateSnapshotResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.CreateTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.DropTableResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.GetAbortedWriteTransactionsResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionRequest;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.KeepAliveTransactionResponse;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpointProtos.RevisionManagerEndpointService;
/**
* This class is nothing but a delegate for the enclosed proxy,
@@ -34,17 +55,18 @@ import org.apache.hadoop.hbase.util.Byte
*/
public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
- private Configuration conf = null;
- private RevisionManager rmProxy;
+ private final RPCConverter rpcConverter = new RPCConverter();
+ private Configuration conf;
+ private HTable htable;
@Override
public Configuration getConf() {
- return this.conf;
+ return conf;
}
@Override
- public void setConf(Configuration arg0) {
- this.conf = arg0;
+ public void setConf(Configuration conf) {
+ this.conf = conf;
}
@Override
@@ -56,70 +78,207 @@ public class RevisionManagerEndpointClie
public void open() throws IOException {
// clone to adjust RPC settings unique to proxy
Configuration clonedConf = new Configuration(conf);
- // conf.set("hbase.ipc.client.connect.max.retries", "0");
- // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
- HTable table = new HTable(clonedConf, HConstants.ROOT_TABLE_NAME);
- rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
- Bytes.toBytes("anyRow"));
- rmProxy.open();
+ htable = new HTable(clonedConf, TableName.META_TABLE_NAME.getNameAsString());
}
@Override
public void close() throws IOException {
- rmProxy.close();
+ htable.close();
}
@Override
- public void createTable(String table, List<String> columnFamilies) throws IOException {
- rmProxy.createTable(table, columnFamilies);
+ public void createTable(final String table, final List<String> columnFamilies) throws IOException {
+ call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+ @Override
+ public Void call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<CreateTableResponse> done =
+ new BlockingRpcCallback<CreateTableResponse>();
+ CreateTableRequest request = CreateTableRequest.newBuilder()
+ .setTableName(table).addAllColumnFamilies(columnFamilies).build();
+ service.createTable(controller, request, done);
+ blockOnResponse(done, controller);
+ return null;
+ }
+ });
}
@Override
- public void dropTable(String table) throws IOException {
- rmProxy.dropTable(table);
+ public void dropTable(final String table) throws IOException {
+ call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+ @Override
+ public Void call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<DropTableResponse> done =
+ new BlockingRpcCallback<DropTableResponse>();
+ DropTableRequest request = DropTableRequest.newBuilder()
+ .setTableName(table).build();
+ service.dropTable(null, request, done);
+ blockOnResponse(done, controller);
+ return null;
+ }
+ });
}
@Override
- public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
- return rmProxy.beginWriteTransaction(table, families);
+ public Transaction beginWriteTransaction(final String table, final List<String> families) throws IOException {
+ return beginWriteTransaction(table, families, null);
}
@Override
- public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+ public Transaction beginWriteTransaction(final String table, final List<String> families, final Long keepAlive)
throws IOException {
- return rmProxy.beginWriteTransaction(table, families, keepAlive);
+ return call(new Batch.Call<RevisionManagerEndpointService, Transaction>() {
+ @Override
+ public Transaction call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<BeginWriteTransactionResponse> done =
+ new BlockingRpcCallback<BeginWriteTransactionResponse>();
+ BeginWriteTransactionRequest.Builder builder = BeginWriteTransactionRequest.newBuilder()
+ .setTableName(table)
+ .addAllColumnFamilies(families);
+ if(keepAlive != null) {
+ builder.setKeepAlive(keepAlive);
+ }
+ service.beginWriteTransaction(controller, builder.build(), done);
+ return rpcConverter.convertTransaction(blockOnResponse(done, controller).getTransaction());
+ }
+ });
}
@Override
- public void commitWriteTransaction(Transaction transaction) throws IOException {
- rmProxy.commitWriteTransaction(transaction);
+ public void commitWriteTransaction(final Transaction transaction) throws IOException {
+ call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+ @Override
+ public Void call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<CommitWriteTransactionResponse> done =
+ new BlockingRpcCallback<CommitWriteTransactionResponse>();
+ CommitWriteTransactionRequest request = CommitWriteTransactionRequest.newBuilder()
+ .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+ service.commitWriteTransaction(controller, request, done);
+ blockOnResponse(done, controller);
+ return null;
+ }
+ });
}
@Override
- public void abortWriteTransaction(Transaction transaction) throws IOException {
- rmProxy.abortWriteTransaction(transaction);
+ public void abortWriteTransaction(final Transaction transaction) throws IOException {
+ call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+ @Override
+ public Void call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<AbortWriteTransactionResponse> done =
+ new BlockingRpcCallback<AbortWriteTransactionResponse>();
+ AbortWriteTransactionRequest request = AbortWriteTransactionRequest.newBuilder()
+ .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+ service.abortWriteTransaction(controller, request, done);
+ blockOnResponse(done, controller);
+ return null;
+ }
+ });
}
@Override
- public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+ public List<FamilyRevision> getAbortedWriteTransactions(final String table, final String columnFamily)
throws IOException {
- return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+ return call(new Batch.Call<RevisionManagerEndpointService, List<FamilyRevision>>() {
+ @Override
+ public List<FamilyRevision> call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<GetAbortedWriteTransactionsResponse> done =
+ new BlockingRpcCallback<GetAbortedWriteTransactionsResponse>();
+ GetAbortedWriteTransactionsRequest request = GetAbortedWriteTransactionsRequest.newBuilder()
+ .setTableName(table)
+ .setColumnFamily(columnFamily)
+ .build();
+ service.getAbortedWriteTransactions(controller, request, done);
+ return rpcConverter.convertFamilyRevisions(blockOnResponse(done, controller).getFamilyRevisionsList());
+ }
+ });
}
@Override
public TableSnapshot createSnapshot(String tableName) throws IOException {
- return rmProxy.createSnapshot(tableName);
+ return createSnapshot(tableName, null);
}
@Override
- public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
- return rmProxy.createSnapshot(tableName, revision);
+ public TableSnapshot createSnapshot(final String tableName, final Long revision) throws IOException {
+ return call(new Batch.Call<RevisionManagerEndpointService, TableSnapshot>() {
+ @Override
+ public TableSnapshot call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<CreateSnapshotResponse> done =
+ new BlockingRpcCallback<CreateSnapshotResponse>();
+ CreateSnapshotRequest.Builder builder = CreateSnapshotRequest.newBuilder()
+ .setTableName(tableName);
+ if(revision != null) {
+ builder.setRevision(revision);
+ }
+ service.createSnapshot(controller, builder.build(), done);
+ return rpcConverter.convertTableSnapshot(blockOnResponse(done, controller).getTableSnapshot());
+ }
+ });
}
@Override
- public void keepAlive(Transaction transaction) throws IOException {
- rmProxy.keepAlive(transaction);
+ public void keepAlive(final Transaction transaction) throws IOException {
+ call(new Batch.Call<RevisionManagerEndpointService, Void>() {
+ @Override
+ public Void call(RevisionManagerEndpointService service)
+ throws IOException {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<KeepAliveTransactionResponse> done =
+ new BlockingRpcCallback<KeepAliveTransactionResponse>();
+ KeepAliveTransactionRequest request = KeepAliveTransactionRequest.newBuilder()
+ .setTransaction(rpcConverter.convertTransaction(transaction)).build();
+ service.keepAliveTransaction(controller, request, done);
+ blockOnResponse(done, controller);
+ return null;
+ }
+ });
+ }
+ private <R> R blockOnResponse(BlockingRpcCallback<R> done, ServerRpcController controller)
+ throws IOException {
+ R response = done.get();
+ if(controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ if(controller.failed()) {
+ String error = controller.errorText();
+ if(error == null) {
+ error = "Server indicated failure but error text was empty";
+ }
+ throw new RuntimeException(error);
+ }
+ return response;
+ }
+ private <R> R call(Batch.Call<RevisionManagerEndpointService, R> callable) throws IOException {
+ try {
+ Map<byte[], R> result = htable.coprocessorService(RevisionManagerEndpointService.class, null, null, callable);
+ if(result.isEmpty()) {
+ return null;
+ }
+ return result.values().iterator().next();
+ } catch(IOException e) {
+ throw (IOException)e;
+ } catch(RuntimeException e) {
+ throw (RuntimeException)e;
+ } catch(Error e) {
+ throw (Error)e;
+ } catch(Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
}
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Wed Nov 13 16:47:11 2013
@@ -59,7 +59,14 @@ public class TableSnapshot implements Se
* @return List<String> A list of column families associated with the snapshot.
*/
public List<String> getColumnFamilies(){
- return new ArrayList<String>(this.cfRevisionMap.keySet());
+ return new ArrayList<String>(this.cfRevisionMap.keySet());
+ }
+
+ /**
+ * For wire serialization only
+ */
+ Map<String, Long> getColumnFamilyRevisionMap() {
+ return cfRevisionMap;
}
/**
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/Transaction.java Wed Nov 13 16:47:11 2013
@@ -64,6 +64,13 @@ public class Transaction implements Seri
}
/**
+ * For wire serialization only
+ */
+ long getTimeStamp() {
+ return timeStamp;
+ }
+
+ /**
* @return The expire timestamp associated with a transaction.
*/
long getTransactionExpireTimeStamp() {
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Wed Nov 13 16:47:11 2013
@@ -119,7 +119,7 @@ public class ZKBasedRevisionManager impl
* @see org.apache.hcatalog.hbase.snapshot.RevisionManager#beginWriteTransaction(java.lang.String, java.util.List, long)
*/
public Transaction beginWriteTransaction(String table,
- List<String> families, long keepAlive) throws IOException {
+ List<String> families, Long keepAlive) throws IOException {
checkInputParams(table, families);
zkUtil.setUpZnodesForTable(table, families);
@@ -175,7 +175,7 @@ public class ZKBasedRevisionManager impl
*/
public Transaction beginWriteTransaction(String table, List<String> families)
throws IOException {
- return beginWriteTransaction(table, families, -1);
+ return beginWriteTransaction(table, families, -1L);
}
/**
@@ -352,7 +352,7 @@ public class ZKBasedRevisionManager impl
/* @throws IOException
* @see org.apache.hcatalog.hbase.snapshot.RevsionManager#createSnapshot(java.lang.String, long)
*/
- public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+ public TableSnapshot createSnapshot(String tableName, Long revision) throws IOException {
long currentID = zkUtil.currentID(tableName);
if (revision > currentID) {
Added: hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto?rev=1541609&view=auto
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto (added)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/protobuf/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.proto Wed Nov 13 16:47:11 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+option java_package = "org.apache.hcatalog.hbase.snapshot";
+option java_outer_classname = "RevisionManagerEndpointProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message CreateTableRequest {
+ required string table_name = 1;
+ repeated string column_families = 2;
+}
+message CreateTableResponse {}
+
+message DropTableRequest {
+ required string table_name = 1;
+}
+message DropTableResponse {}
+
+message BeginWriteTransactionRequest {
+ required string table_name = 1;
+ optional int64 keep_alive = 2;
+ repeated string column_families = 3;
+}
+message BeginWriteTransactionResponse {
+ required Transaction transaction = 1;
+}
+
+message CommitWriteTransactionRequest {
+ required Transaction transaction = 1;
+}
+message CommitWriteTransactionResponse {}
+
+message AbortWriteTransactionRequest {
+ required Transaction transaction = 1;
+}
+message AbortWriteTransactionResponse {}
+
+message GetAbortedWriteTransactionsRequest {
+ required string table_name = 1;
+ required string column_family = 2;
+}
+message GetAbortedWriteTransactionsResponse {
+ repeated FamilyRevision family_revisions = 1;
+}
+
+message CreateSnapshotRequest {
+ required string table_name = 1;
+ optional int64 revision = 2;
+}
+message CreateSnapshotResponse {
+ required TableSnapshot table_snapshot = 1;
+}
+
+message KeepAliveTransactionRequest {
+ required Transaction transaction = 1;
+}
+message KeepAliveTransactionResponse{}
+
+message FamilyRevision {
+ required int64 revision = 1;
+ required int64 timestamp = 2;
+}
+message Transaction {
+ required string table_name = 1;
+ required int64 time_stamp = 2;
+ required int64 keep_alive = 3;
+ required int64 revision = 4;
+ repeated string column_families = 5;
+}
+message TableSnapshot {
+ required string table_name = 1;
+ required int64 latest_revision = 2;
+ message ColumnFamilyRevision {
+ required string key = 1;
+ required int64 value = 2;
+ }
+ repeated ColumnFamilyRevision column_family_revision = 3;
+}
+
+service RevisionManagerEndpointService {
+ rpc createTable(CreateTableRequest) returns(CreateTableResponse);
+ rpc dropTable(DropTableRequest) returns(DropTableResponse);
+ rpc beginWriteTransaction(BeginWriteTransactionRequest) returns(BeginWriteTransactionResponse);
+ rpc commitWriteTransaction(CommitWriteTransactionRequest) returns(CommitWriteTransactionResponse);
+ rpc abortWriteTransaction(AbortWriteTransactionRequest) returns(AbortWriteTransactionResponse);
+ rpc getAbortedWriteTransactions(GetAbortedWriteTransactionsRequest) returns(GetAbortedWriteTransactionsResponse);
+ rpc createSnapshot(CreateSnapshotRequest) returns(CreateSnapshotResponse);
+ rpc keepAliveTransaction(KeepAliveTransactionRequest) returns (KeepAliveTransactionResponse);
+}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Wed Nov 13 16:47:11 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
@@ -121,7 +122,7 @@ public class ManyMiniCluster {
protected synchronized void stop() {
if (hbaseCluster != null) {
- HConnectionManager.deleteAllConnections(true);
+ HConnectionManager.deleteAllConnections();
try {
hbaseCluster.shutdown();
} catch (Exception e) {
@@ -266,7 +267,7 @@ public class ManyMiniCluster {
hbaseCluster = new MiniHBaseCluster(hbaseConf, numRegionServers);
hbaseConf.set("hbase.master", hbaseCluster.getMaster().getServerName().getHostAndPort());
//opening the META table ensures that cluster is running
- new HTable(hbaseConf, HConstants.META_TABLE_NAME);
+ new HTable(hbaseConf, TableName.META_TABLE_NAME.getNameAsString());
} catch (Exception e) {
throw new IllegalStateException("Failed to setup HBase Cluster", e);
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -19,6 +19,15 @@
package org.apache.hcatalog.hbase;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -71,15 +80,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests components of HBaseHCatStorageHandler using ManyMiniCluster.
* Including ImprtSequenceFile and HBaseBulkOutputFormat
@@ -258,6 +258,7 @@ public class TestHBaseBulkOutputFormat e
}
index++;
}
+ table.close();
//test if load count is the same
assertEquals(data.length, index);
//test if scratch directory was erased
@@ -297,6 +298,7 @@ public class TestHBaseBulkOutputFormat e
//create job
+ HBaseHCatStorageHandler.setHBaseSerializers(conf);
Job job = new Job(conf, testName);
job.setWorkingDirectory(new Path(methodTestDir, "mr_work"));
job.setJarByClass(this.getClass());
@@ -335,6 +337,7 @@ public class TestHBaseBulkOutputFormat e
}
index++;
}
+ table.close();
//test if load count is the same
assertEquals(data.length, index);
//test if scratch directory was erased
@@ -432,6 +435,7 @@ public class TestHBaseBulkOutputFormat e
}
index++;
}
+ table.close();
//test if load count is the same
assertEquals(data.length, index);
}
@@ -514,6 +518,7 @@ public class TestHBaseBulkOutputFormat e
}
index++;
}
+ table.close();
//test if load count is the same
assertEquals(data.length, index);
}
@@ -608,6 +613,7 @@ public class TestHBaseBulkOutputFormat e
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setNumReduceTasks(0);
+ table.close();
assertTrue(job.waitForCompletion(true));
}
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java Wed Nov 13 16:47:11 2013
@@ -189,6 +189,7 @@ public class TestHBaseDirectOutputFormat
}
index++;
}
+ table.close();
assertEquals(data.length, index);
}
@@ -356,6 +357,7 @@ public class TestHBaseDirectOutputFormat
}
count++;
}
+ table.close();
assertEquals(data.length - 1, count);
// verify that the inputformat returns empty results.
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/TestHCatHBaseInputFormat.java Wed Nov 13 16:47:11 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.hbase.ResultWritable;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -546,7 +547,7 @@ public class TestHCatHBaseInputFormat ex
}
static class MapReadProjectionHTable
- implements org.apache.hadoop.mapred.Mapper<ImmutableBytesWritable, Result, WritableComparable<?>, Text> {
+ implements org.apache.hadoop.mapred.Mapper<ImmutableBytesWritable, Object, WritableComparable<?>, Text> {
static boolean error = false;
static int count = 0;
@@ -560,9 +561,17 @@ public class TestHCatHBaseInputFormat ex
}
@Override
- public void map(ImmutableBytesWritable key, Result result,
+ public void map(ImmutableBytesWritable key, Object resultObj,
OutputCollector<WritableComparable<?>, Text> output, Reporter reporter)
throws IOException {
+ Result result;
+ if (resultObj instanceof Result){
+ result = (Result) resultObj;
+ } else if (resultObj instanceof ResultWritable) {
+ result = ((ResultWritable)resultObj).getResult();
+ } else {
+ throw new IllegalArgumentException("Illegal Argument " + (resultObj == null ? "null" : resultObj.getClass().getName()));
+ }
System.out.println("Result " + result.toString());
List<KeyValue> list = result.list();
boolean correctValues = (list.size() == 1)
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManager.java Wed Nov 13 16:47:11 2013
@@ -210,7 +210,7 @@ public class TestRevisionManager extends
String tableName = newTableName("testTable");
List<String> columnFamilies = Arrays.asList("cf1", "cf2");
Transaction txn = manager.beginWriteTransaction(tableName,
- columnFamilies, 40);
+ columnFamilies, 40L);
Thread.sleep(100);
try {
manager.commitWriteTransaction(txn);
Modified: hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java (original)
+++ hive/trunk/hcatalog/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Wed Nov 13 16:47:11 2013
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase.snapsh
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -135,13 +136,13 @@ public class TestRevisionManagerEndpoint
@Override
public Transaction beginWriteTransaction(String table,
List<String> families) throws IOException {
- return recordCall(null, table, families);
+ return recordCall(new Transaction(table, families, 0L, 0L), table, families);
}
@Override
public Transaction beginWriteTransaction(String table,
- List<String> families, long keepAlive) throws IOException {
- return recordCall(null, table, families, keepAlive);
+ List<String> families, Long keepAlive) throws IOException {
+ return recordCall(new Transaction(table, families, 0L, 0L), table, families, keepAlive);
}
@Override
@@ -157,17 +158,17 @@ public class TestRevisionManagerEndpoint
@Override
public List<FamilyRevision> getAbortedWriteTransactions(String table,
String columnFamily) throws IOException {
- return null;
+ return Collections.singletonList(new FamilyRevision(0L, 0L));
}
@Override
public TableSnapshot createSnapshot(String tableName)
throws IOException {
- return null;
+ return createSnapshot(tableName, 0L);
}
@Override
- public TableSnapshot createSnapshot(String tableName, long revision)
+ public TableSnapshot createSnapshot(String tableName, Long revision)
throws IOException {
TableSnapshot ret = new TableSnapshot(tableName, new HashMap<String, Long>(), revision);
return recordCall(ret, tableName, revision);
@@ -201,7 +202,7 @@ public class TestRevisionManagerEndpoint
Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
call = new MockRM.Invocation("createSnapshot", null, "t3", 1L);
- call.ret = rm.createSnapshot("t3", 1);
+ call.ret = rm.createSnapshot("t3", 1L);
Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
}
Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Nov 13 16:47:11 2013
@@ -97,18 +97,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
@@ -143,6 +131,64 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop1-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop1-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
@@ -199,6 +245,64 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>${pig.version}</version>
Modified: hive/trunk/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/qtest/pom.xml?rev=1541609&r1=1541608&r2=1541609&view=diff
==============================================================================
--- hive/trunk/itests/qtest/pom.xml (original)
+++ hive/trunk/itests/qtest/pom.xml Wed Nov 13 16:47:11 2013
@@ -105,19 +105,6 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <scope>test</scope>
- <classifier>tests</classifier>
- </dependency>
</dependencies>
<profiles>
@@ -156,6 +143,58 @@
<version>${hadoop-20S.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop1-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop1-compat</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop1.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -191,6 +230,58 @@
<version>${hadoop-23.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.hadoop2.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>