You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 02:56:06 UTC
svn commit: r1296568 [1/3] - in /incubator/hcatalog/trunk: ./
src/java/org/apache/hcatalog/mapreduce/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
storage-drivers/hbase/src...
Author: toffer
Date: Sat Mar 3 02:56:05 2012
New Revision: 1296568
URL: http://svn.apache.org/viewvc?rev=1296568&view=rev
Log:
HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)
Added:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Removed:
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Mar 3 02:56:05 2012
@@ -33,6 +33,8 @@ Trunk (unreleased changes)
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
+ HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)
+
HCAT-265 remove deprecated HCatStorageHandler (toffer)
HCAT-239. Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers (vikram.dixit via gates)
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Sat Mar 3 02:56:05 2012
@@ -20,11 +20,12 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.mapred.HCatMapRedUtil;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hcatalog.common.HCatConstants;
@@ -36,6 +37,8 @@ import org.apache.hcatalog.common.HCatUt
*/
class DefaultOutputCommitterContainer extends OutputCommitterContainer {
+ private static final Log LOG = LogFactory.getLog(DefaultOutputCommitterContainer.class);
+
/**
* @param context current JobContext
* @param baseCommitter OutputCommitter to contain
@@ -86,11 +89,9 @@ class DefaultOutputCommitterContainer ex
public void cleanupJob(JobContext context) throws IOException {
getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-
//Cancel HCat and JobTracker tokens
try {
- HiveConf hiveConf = HCatUtil.getHiveConf(null,
+ HiveConf hiveConf = HCatUtil.getHiveConf(null,
context.getConfiguration());
HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
String tokenStrForm = client.getTokenStrForm();
@@ -98,7 +99,7 @@ class DefaultOutputCommitterContainer ex
client.cancelDelegationToken(tokenStrForm);
}
} catch (Exception e) {
- throw new IOException("Failed to cancel delegation token",e);
+ LOG.warn("Failed to cancel delegation token", e);
}
}
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Sat Mar 3 02:56:05 2012
@@ -30,6 +30,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.HCatRecord;
import java.io.IOException;
+import java.text.NumberFormat;
/**
* Bare bones implementation of OutputFormatContainer. Does only the required
@@ -38,10 +39,20 @@ import java.io.IOException;
*/
class DefaultOutputFormatContainer extends OutputFormatContainer {
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
super(of);
}
+ static synchronized String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
/**
* Get the record writer for the job. Uses the Table's default OutputStorageDriver
@@ -53,8 +64,9 @@ class DefaultOutputFormatContainer exten
@Override
public RecordWriter<WritableComparable<?>, HCatRecord>
getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
return new DefaultRecordWriterContainer(context,
- getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context)));
+ getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
}
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Sat Mar 3 02:56:05 2012
@@ -72,7 +72,7 @@ class DefaultRecordWriterContainer exten
public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
InterruptedException {
try {
- getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI));
+ getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
} catch (SerDeException e) {
throw new IOException("Failed to serialize object",e);
}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java Sat Mar 3 02:56:05 2012
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+public class HBaseBaseOutputFormat implements OutputFormat<WritableComparable<?>, Put>,
+ HiveOutputFormat<WritableComparable<?>, Put> {
+
+ @Override
+ public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jc, Path finalOutPath,
+ Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress)
+ throws IOException {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
+ OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+ outputFormat.checkOutputSpecs(ignored, job);
+ }
+
+ @Override
+ public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress) throws IOException {
+ OutputFormat<WritableComparable<?>, Put> outputFormat = getOutputFormat(job);
+ return outputFormat.getRecordWriter(ignored, job, name, progress);
+ }
+
+ private OutputFormat<WritableComparable<?>, Put> getOutputFormat(JobConf job)
+ throws IOException {
+ String outputInfo = job.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(outputInfo);
+ OutputFormat<WritableComparable<?>, Put> outputFormat = null;
+ if (HBaseHCatStorageHandler.isBulkMode(outputJobInfo)) {
+ outputFormat = new HBaseBulkOutputFormat();
+ } else {
+ outputFormat = new HBaseDirectOutputFormat();
+ }
+ return outputFormat;
+ }
+}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Sat Mar 3 02:56:05 2012
@@ -1,160 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hcatalog.hbase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-
-import java.io.IOException;
/**
- * Class which imports data into HBase via it's "bulk load" feature. Wherein regions
- * are created by the MR job using HFileOutputFormat and then later "moved" into
- * the appropriate region server.
+ * Class which imports data into HBase via it's "bulk load" feature. Wherein
+ * regions are created by the MR job using HFileOutputFormat and then later
+ * "moved" into the appropriate region server.
*/
-class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
- private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new byte[0]);
- private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
- private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class);
+class HBaseBulkOutputFormat extends HBaseBaseOutputFormat {
+
+ private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
+ new byte[0]);
+ private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
public HBaseBulkOutputFormat() {
- baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
+ baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
}
@Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- baseOutputFormat.checkOutputSpecs(context);
- //Get jobTracker delegation token if security is enabled
- //we need to launch the ImportSequenceFile job
- if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
- JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
- context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
- }
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setOutputCommitter(HBaseBulkOutputCommitter.class);
+ baseOutputFormat.checkOutputSpecs(ignored, job);
+ getJTDelegationToken(job);
}
@Override
- public RecordWriter<WritableComparable<?>, Put> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- //TODO use a constant/static setter when available
- context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
- context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
- return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+ public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException {
+ long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+ return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
+ ignored, job, name, progress), version);
}
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
- return new HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
+ private void getJTDelegationToken(JobConf job) throws IOException {
+ // Get jobTracker delegation token if security is enabled
+ // we need to launch the ImportSequenceFile job
+ if (job.getBoolean("hadoop.security.authorization", false)) {
+ JobClient jobClient = new JobClient(new JobConf(job));
+ try {
+ job.getCredentials().addToken(new Text("my mr token"),
+ jobClient.getDelegationToken(null));
+ } catch (InterruptedException e) {
+ throw new IOException("Error while getting JT delegation token", e);
+ }
+ }
}
- private static class HBaseBulkRecordWriter extends RecordWriter<WritableComparable<?>,Put> {
- private RecordWriter<WritableComparable<?>,Put> baseWriter;
+ private static class HBaseBulkRecordWriter implements
+ RecordWriter<WritableComparable<?>, Put> {
- public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put> baseWriter) {
+ private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private final Long outputVersion;
+
+ public HBaseBulkRecordWriter(
+ RecordWriter<WritableComparable<?>, Put> baseWriter,
+ Long outputVersion) {
this.baseWriter = baseWriter;
+ this.outputVersion = outputVersion;
}
@Override
- public void write(WritableComparable<?> key, Put value) throws IOException, InterruptedException {
- //we ignore the key
- baseWriter.write(EMPTY_LIST, value);
+ public void write(WritableComparable<?> key, Put value)
+ throws IOException {
+ Put put = value;
+ if (outputVersion != null) {
+ put = new Put(value.getRow(), outputVersion.longValue());
+ for (List<KeyValue> row : value.getFamilyMap().values()) {
+ for (KeyValue el : row) {
+ put.add(el.getFamily(), el.getQualifier(), el.getValue());
+ }
+ }
+ }
+ // we ignore the key
+ baseWriter.write(EMPTY_LIST, put);
}
@Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- baseWriter.close(context);
+ public void close(Reporter reporter) throws IOException {
+ baseWriter.close(reporter);
}
}
- private static class HBaseBulkOutputCommitter extends OutputCommitter {
- private OutputCommitter baseOutputCommitter;
+ public static class HBaseBulkOutputCommitter extends OutputCommitter {
+
+ private final OutputCommitter baseOutputCommitter;
- public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
- this.baseOutputCommitter = baseOutputCommitter;
+ public HBaseBulkOutputCommitter() {
+ baseOutputCommitter = new FileOutputCommitter();
}
@Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.abortTask(context);
+ public void abortTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.abortTask(taskContext);
}
@Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.commitTask(context);
+ public void commitTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.commitTask(taskContext);
}
@Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return baseOutputCommitter.needsTaskCommit(context);
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return baseOutputCommitter.needsTaskCommit(taskContext);
}
@Override
- public void setupJob(JobContext context) throws IOException {
- baseOutputCommitter.setupJob(context);
+ public void setupJob(JobContext jobContext) throws IOException {
+ baseOutputCommitter.setupJob(jobContext);
}
@Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.setupTask(context);
+ public void setupTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.setupTask(taskContext);
}
@Override
- public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ baseOutputCommitter.abortJob(jobContext, status);
RevisionManager rm = null;
try {
- baseOutputCommitter.abortJob(jobContext,state);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ rm.abortWriteTransaction(HBaseRevisionManagerUtil
+ .getWriteTransaction(jobContext.getConfiguration()));
} finally {
cleanIntermediate(jobContext);
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
+ baseOutputCommitter.commitJob(jobContext);
RevisionManager rm = null;
try {
- baseOutputCommitter.commitJob(jobContext);
Configuration conf = jobContext.getConfiguration();
- Path srcPath = FileOutputFormat.getOutputPath(jobContext);
- Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
+ Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+ Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
ImportSequenceFile.runJob(jobContext,
- conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
- srcPath,
- destPath);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+ srcPath,
+ destPath);
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
cleanIntermediate(jobContext);
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
- public void cleanIntermediate(JobContext jobContext) throws IOException {
+ private void cleanIntermediate(JobContext jobContext)
+ throws IOException {
FileSystem fs = FileSystem.get(jobContext.getConfiguration());
- fs.delete(FileOutputFormat.getOutputPath(jobContext),true);
+ fs.delete(FileOutputFormat.getOutputPath(jobContext.getJobConf()), true);
}
}
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Sat Mar 3 02:56:05 2012
@@ -18,104 +18,135 @@
package org.apache.hcatalog.hbase;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.io.Writable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-
-
-import java.io.IOException;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
/**
- * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
- * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
- * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put
+ * API to write each row to HBase one a time. Presently it is just using
+ * TableOutputFormat as the underlying implementation in the future we can tune
+ * this to make the writes faster such as permanently disabling WAL, caching,
+ * etc.
*/
-class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+class HBaseDirectOutputFormat extends HBaseBaseOutputFormat {
- private TableOutputFormat<WritableComparable<?>> outputFormat;
+ private TableOutputFormat outputFormat;
public HBaseDirectOutputFormat() {
- this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+ this.outputFormat = new TableOutputFormat();
}
@Override
- public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- return outputFormat.getRecordWriter(context);
+ public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress)
+ throws IOException {
+ long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+ return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
+ progress), version);
}
@Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- outputFormat.checkOutputSpecs(context);
- }
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ job.setOutputCommitter(HBaseDirectOutputCommitter.class);
+ job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
+ job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
+ outputFormat.checkOutputSpecs(ignored, job);
+ }
+
+ private static class HBaseDirectRecordWriter implements
+ RecordWriter<WritableComparable<?>, Put> {
+
+ private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private final Long outputVersion;
+
+ public HBaseDirectRecordWriter(
+ RecordWriter<WritableComparable<?>, Put> baseWriter,
+ Long outputVersion) {
+ this.baseWriter = baseWriter;
+ this.outputVersion = outputVersion;
+ }
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
- }
+ @Override
+ public void write(WritableComparable<?> key, Put value)
+ throws IOException {
+ Put put = value;
+ if (outputVersion != null) {
+ put = new Put(value.getRow(), outputVersion.longValue());
+ for (List<KeyValue> row : value.getFamilyMap().values()) {
+ for (KeyValue el : row) {
+ put.add(el.getFamily(), el.getQualifier(), el.getValue());
+ }
+ }
+ }
+ baseWriter.write(key, put);
+ }
- @Override
- public void setConf(Configuration conf) {
- String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
- conf = new Configuration(conf);
- conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
- outputFormat.setConf(conf);
- }
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ baseWriter.close(reporter);
+ }
- @Override
- public Configuration getConf() {
- return outputFormat.getConf();
}
- private static class HBaseDirectOutputCommitter extends OutputCommitter {
- private OutputCommitter baseOutputCommitter;
+ public static class HBaseDirectOutputCommitter extends OutputCommitter {
- public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
- this.baseOutputCommitter = baseOutputCommitter;
+ public HBaseDirectOutputCommitter() throws IOException {
}
@Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.abortTask(context);
+ public void abortTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.commitTask(context);
+ public void commitTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return baseOutputCommitter.needsTaskCommit(context);
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return false;
}
@Override
- public void setupJob(JobContext context) throws IOException {
- baseOutputCommitter.setupJob(context);
+ public void setupJob(JobContext jobContext) throws IOException {
}
@Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.setupTask(context);
+ public void setupTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ super.abortJob(jobContext, status);
RevisionManager rm = null;
try {
- baseOutputCommitter.abortJob(jobContext, state);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ Transaction writeTransaction = HBaseRevisionManagerUtil
+ .getWriteTransaction(jobContext.getConfiguration());
+ rm.abortWriteTransaction(writeTransaction);
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
@@ -124,11 +155,12 @@ class HBaseDirectOutputFormat extends Ou
public void commitJob(JobContext jobContext) throws IOException {
RevisionManager rm = null;
try {
- baseOutputCommitter.commitJob(jobContext);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(jobContext
+ .getConfiguration()));
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Sat Mar 3 02:56:05 2012
@@ -19,22 +19,24 @@
package org.apache.hcatalog.hbase;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
@@ -50,13 +52,11 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
@@ -74,22 +74,93 @@ import com.facebook.fb303.FacebookBase;
* tables through HCatalog. The implementation is very similar to the
* HiveHBaseStorageHandler, with more details to suit HCatalog.
*/
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook {
+//TODO remove serializable when HCATALOG-282 is fixed
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
- final static public String DEFAULT_PREFIX = "default.";
+ public final static String DEFAULT_PREFIX = "default.";
+ private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
- private Configuration hbaseConf;
-
- private HBaseAdmin admin;
+ private transient Configuration hbaseConf;
+ private transient HBaseAdmin admin;
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- //TODO complete rework and fill this in
+ // Populate jobProperties with input table name, table columns, RM snapshot,
+ // hbase-default.xml and hbase-site.xml
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ try {
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+ jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+ Configuration jobConf = getConf();
+ String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+ jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+ String serSnapshot = (String) inputJobInfo.getProperties().get(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ if (serSnapshot == null) {
+ Configuration conf = addHbaseResources(jobConf);
+ HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+ qualifiedTableName, tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+ HCatUtil.serialize(snapshot));
+ }
+
+ addHbaseResources(jobConf, jobProperties);
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- //TODO complete rework and fill this in
+ // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+ // Populate RM transaction in OutputJobInfo
+ // In case of bulk mode, populate intermediate output location
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ try {
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+
+ Configuration jobConf = getConf();
+ String txnString = outputJobInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ if (txnString == null) {
+ Configuration conf = addHbaseResources(jobConf);
+ Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+
+ if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
+ .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
+ String tableLocation = tableInfo.getTableLocation();
+ String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+ .toString();
+ outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
+ location);
+ // We are writing out an intermediate sequenceFile hence
+ // location is not passed in OutputJobInfo.getLocation()
+ // TODO replace this with a mapreduce constant when available
+ jobProperties.put("mapred.output.dir", location);
+ }
+ }
+
+ jobProperties
+ .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ addHbaseResources(jobConf, jobProperties);
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
}
/*
@@ -231,7 +302,7 @@ public class HBaseHCatStorageHandler ext
new HTable(hbaseConf, tableDesc.getName());
//Set up znodes in revision manager.
- RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
if (rm instanceof ZKBasedRevisionManager) {
ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
zkRM.setUpZNodes(tableName, new ArrayList<String>(
@@ -295,36 +366,6 @@ public class HBaseHCatStorageHandler ext
return this;
}
-//TODO finish rework remove this
-// /*
-// * @param tableDesc
-// *
-// * @param jobProperties
-// *
-// * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-// * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-// * java.util.Map)
-// */
-// @Override
-// public void configureTableJobProperties(TableDesc tableDesc,
-// Map<String, String> jobProperties) {
-// Properties tableProperties = tableDesc.getProperties();
-//
-// jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-// tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
-//
-// String tableName = tableProperties
-// .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-// if (tableName == null) {
-// tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
-// if (tableName.startsWith(DEFAULT_PREFIX)) {
-// tableName = tableName.substring(DEFAULT_PREFIX.length());
-// }
-// }
-// jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
-//
-// }
-
private HBaseAdmin getHBaseAdmin() throws MetaException {
try {
if (admin == null) {
@@ -356,14 +397,12 @@ public class HBaseHCatStorageHandler ext
@Override
public Class<? extends InputFormat> getInputFormatClass() {
- //TODO replace this with rework
- return InputFormat.class;
+ return HBaseInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- //TODO replace this with rework
- return SequenceFileOutputFormat.class;
+ return HBaseBaseOutputFormat.class;
}
/*
@@ -397,6 +436,7 @@ public class HBaseHCatStorageHandler ext
private void checkDeleteTable(Table table) throws MetaException {
boolean isExternal = MetaStoreUtils.isExternalTable(table);
String tableName = getHBaseTableName(table);
+ RevisionManager rm = null;
try {
if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
// we have created an HBase table, so we delete it to roll back;
@@ -406,7 +446,7 @@ public class HBaseHCatStorageHandler ext
getHBaseAdmin().deleteTable(tableName);
//Set up znodes in revision manager.
- RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
if (rm instanceof ZKBasedRevisionManager) {
ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
zkRM.deleteZNodes(tableName);
@@ -414,6 +454,8 @@ public class HBaseHCatStorageHandler ext
}
} catch (IOException ie) {
throw new MetaException(StringUtils.stringifyException(ie));
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
}
}
@@ -436,9 +478,7 @@ public class HBaseHCatStorageHandler ext
* @param conf
* @throws IOException
*/
- public static void addDependencyJars(Configuration conf) throws IOException {
- //TODO provide a facility/interface for loading/specifying dependencies
- //Ideally this method shouldn't be exposed to the user
+ private void addOutputDependencyJars(Configuration conf) throws IOException {
TableMapReduceUtil.addDependencyJars(conf,
//hadoop-core
Writable.class,
@@ -452,8 +492,6 @@ public class HBaseHCatStorageHandler ext
HCatOutputFormat.class,
//hive hbase storage handler jar
HBaseSerDe.class,
- //hcat hbase storage driver jar
- HBaseOutputStorageDriver.class,
//hive jar
Table.class,
//libthrift jar
@@ -464,152 +502,86 @@ public class HBaseHCatStorageHandler ext
FacebookBase.class);
}
-
/**
- * Creates the latest snapshot of the table.
- *
- * @param jobConf The job configuration.
- * @param hbaseTableName The fully qualified name of the HBase table.
- * @return An instance of HCatTableSnapshot
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String hbaseTableName ) throws IOException {
-
- RevisionManager rm = null;
- TableSnapshot snpt;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(hbaseTableName);
- } finally {
- if (rm != null)
- rm.close();
- }
-
- String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
- if(inputJobString == null){
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
- }
- InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
- HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
- .convertSnapshot(snpt, inputInfo.getTableInfo());
-
- return hcatSnapshot;
+ * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
+ * @param jobConf existing configuration
+ * @return a new Configuration with hbase-default.xml and hbase-site.xml added
+ */
+ private Configuration addHbaseResources(Configuration jobConf) {
+ Configuration conf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(conf);
+ return conf;
}
/**
- * Creates the snapshot using the revision specified by the user.
- *
- * @param jobConf The job configuration.
- * @param tableName The fully qualified name of the table whose snapshot is being taken.
- * @param revision The revision number to use for the snapshot.
- * @return An instance of HCatTableSnapshot.
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String tableName, long revision)
- throws IOException {
-
- TableSnapshot snpt;
- RevisionManager rm = null;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(tableName, revision);
- } finally {
- if (rm != null)
- rm.close();
- }
-
- String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
- if(inputJobString == null){
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
+ * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+ * if they are not already present in the jobConf.
+ * @param jobConf Job configuration
+ * @param newJobProperties Map to which new properties should be added
+ */
+ private void addHbaseResources(Configuration jobConf,
+ Map<String, String> newJobProperties) {
+ Configuration conf = new Configuration(false);
+ HBaseConfiguration.addHbaseResources(conf);
+ for (Entry<String, String> entry : conf) {
+ if (jobConf.get(entry.getKey()) == null)
+ newJobProperties.put(entry.getKey(), entry.getValue());
}
- InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
- HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
- .convertSnapshot(snpt, inputInfo.getTableInfo());
-
- return hcatSnapshot;
}
- /**
- * Gets an instance of revision manager which is opened.
- *
- * @param jobConf The job configuration.
- * @return RevisionManager An instance of revision manager.
- * @throws IOException
- */
- static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
- Properties properties = new Properties();
- String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
- int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
- if (zkHostList != null) {
- String[] splits = zkHostList.split(",");
- StringBuffer sb = new StringBuffer();
- for (String split : splits) {
- sb.append(split);
- sb.append(':');
- sb.append(port);
- sb.append(',');
- }
-
- sb.deleteCharAt(sb.length() - 1);
- properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
- }
- String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
- if (dataDir != null) {
- properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
- }
- String rmClassName = jobConf.get(
- RevisionManager.REVISION_MGR_IMPL_CLASS,
- ZKBasedRevisionManager.class.getName());
- properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
- RevisionManager revisionManger = RevisionManagerFactory
- .getRevisionManager(properties);
- revisionManger.open();
- return revisionManger;
- }
-
- /**
- * Set snapshot as a property.
- *
- * @param snapshot The HCatTableSnapshot to be passed to the job.
- * @param inpJobInfo The InputJobInfo for the job.
- * @throws IOException
- */
- public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo)
- throws IOException {
- String serializedSnp = HCatUtil.serialize(snapshot);
- inpJobInfo.getProperties().setProperty(
- HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
- }
-
- static Transaction getWriteTransaction(Configuration conf) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
- .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
- }
-
- static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
- conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
- }
-
- /**
- * Get the Revision number that will be assigned to this job's output data
- * @param conf configuration of the job
- * @return the revision number used
- * @throws IOException
- */
- public static long getOutputRevision(Configuration conf) throws IOException {
- return getWriteTransaction(conf).getRevisionNumber();
+ public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+ //Default is false
+ String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY,
+ "false");
+ return "true".equals(bulkMode);
+ }
+
+ private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+ if (outputColSchema == null) {
+ String[] splits = hbaseColumnMapping.split("[,]");
+ for (int i = 0; i < splits.length; i++) {
+ if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+ builder.append(splits[i]).append(" ");
+ }
+ } else {
+ HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+ HCatSchema tableSchema = tableInfo.getDataColumns();
+ List<String> outputFieldNames = outputSchema.getFieldNames();
+ List<Integer> outputColumnMapping = new ArrayList<Integer>();
+ for(String fieldName: outputFieldNames){
+ int position = tableSchema.getPosition(fieldName);
+ outputColumnMapping.add(position);
+ }
+ try {
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+ columnQualifiers, null);
+ for (int i = 0; i < outputColumnMapping.size(); i++) {
+ int cfIndex = outputColumnMapping.get(i);
+ String cf = columnFamilies.get(cfIndex);
+ // We skip the key column.
+ if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ String qualifier = columnQualifiers.get(i);
+ builder.append(cf);
+ builder.append(":");
+ if (qualifier != null) {
+ builder.append(qualifier);
+ }
+ builder.append(" ");
+ }
+ }
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ }
+ //Remove the extra space delimiter
+ builder.deleteCharAt(builder.length() - 1);
+ return builder.toString();
}
}
Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1296568&r1=1296567&r2=1296568&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Sat Mar 3 02:56:05 2012
@@ -21,33 +21,31 @@ package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.mapreduce.InputJobInfo;
/**
* This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
*/
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> implements Configurable{
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
private final TableInputFormat inputFormat;
- private final InputJobInfo jobInfo;
- private Configuration conf;
- public HBaseInputFormat(InputJobInfo jobInfo) {
+ public HBaseInputFormat() {
inputFormat = new TableInputFormat();
- this.jobInfo = jobInfo;
}
/*
@@ -67,20 +65,27 @@ class HBaseInputFormat extends InputForm
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
- InputSplit split, TaskAttemptContext tac) throws IOException,
- InterruptedException {
-
- String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
- TableSplit tSplit = (TableSplit) split;
- HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo);
- Scan sc = new Scan(inputFormat.getScan());
- sc.setStartRow(tSplit.getStartRow());
- sc.setStopRow(tSplit.getEndRow());
- recordReader.setScan(sc);
- recordReader.setHTable(new HTable(this.conf, tableName));
- recordReader.init();
- return recordReader;
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+ String tableName = job.get(TableInputFormat.INPUT_TABLE);
+ TableSplit tSplit = (TableSplit) split;
+ HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+ inputFormat.setConf(job);
+ Scan inputScan = inputFormat.getScan();
+ // TODO: Make the caching configurable by the user
+ inputScan.setCaching(200);
+ inputScan.setCacheBlocks(false);
+ Scan sc = new Scan(inputScan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ recordReader.setScan(sc);
+ recordReader.setHTable(new HTable(job, tableName));
+ recordReader.init();
+ return recordReader;
}
/*
@@ -97,35 +102,24 @@ class HBaseInputFormat extends InputForm
* .JobContext)
*/
@Override
- public List<InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
-
- String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
- if (tableName == null) {
- throw new IOException("The input table is not set. The input splits cannot be created.");
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ inputFormat.setConf(job);
+ return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+ Reporter.NULL)));
+ }
+
+ private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+ InputSplit[] converted = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+ (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+ TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+ tableSplit.getStartRow(),
+ tableSplit.getEndRow(), tableSplit.getRegionLocation());
+ converted[i] = newTableSplit;
}
- return inputFormat.getSplits(jobContext);
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- inputFormat.setConf(conf);
- }
-
- public Scan getScan() {
- return inputFormat.getScan();
- }
-
- public void setScan(Scan scan) {
- inputFormat.setScan(scan);
- }
-
- /* @return
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- @Override
- public Configuration getConf() {
- return this.conf;
+ return converted;
}
}
Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1296568&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Sat Mar 3 02:56:05 2012
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
+import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
+import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
+import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
+
+
+/**
+ * The Class HBaseRevisionManagerUtil has utility methods to interact with Revision Manager
+ *
+ */
+class HBaseRevisionManagerUtil {
+
+ private final static Log LOG = LogFactory.getLog(HBaseRevisionManagerUtil.class);
+
+ private HBaseRevisionManagerUtil() {
+ }
+
+ /**
+ * Creates the latest snapshot of the table.
+ *
+ * @param jobConf The job configuration.
+ * @param hbaseTableName The fully qualified name of the HBase table.
+ * @param tableInfo HCat table information
+ * @return An instance of HCatTableSnapshot
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String hbaseTableName, HCatTableInfo tableInfo ) throws IOException {
+
+ RevisionManager rm = null;
+ TableSnapshot snpt;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(hbaseTableName);
+ } finally {
+ closeRevisionManagerQuietly(rm);
+ }
+
+ HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil.convertSnapshot(snpt, tableInfo);
+ return hcatSnapshot;
+ }
+
+ /**
+ * Creates the snapshot using the revision specified by the user.
+ *
+ * @param jobConf The job configuration.
+ * @param tableName The fully qualified name of the table whose snapshot is being taken.
+ * @param revision The revision number to use for the snapshot.
+ * @return An instance of HCatTableSnapshot.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ static HCatTableSnapshot createSnapshot(Configuration jobConf,
+ String tableName, long revision)
+ throws IOException {
+
+ TableSnapshot snpt;
+ RevisionManager rm = null;
+ try {
+ rm = getOpenedRevisionManager(jobConf);
+ snpt = rm.createSnapshot(tableName, revision);
+ } finally {
+ closeRevisionManagerQuietly(rm);
+ }
+
+ String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ if(inputJobString == null){
+ throw new IOException(
+ "InputJobInfo information not found in JobContext. "
+ + "HCatInputFormat.setInput() not called?");
+ }
+ InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
+ HCatTableSnapshot hcatSnapshot = HBaseRevisionManagerUtil
+ .convertSnapshot(snpt, inputInfo.getTableInfo());
+
+ return hcatSnapshot;
+ }
+
+ /**
+ * Gets an instance of revision manager which is opened.
+ *
+ * @param jobConf The job configuration.
+ * @return RevisionManager An instance of revision manager.
+ * @throws IOException
+ */
+ static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
+
+ Properties properties = new Properties();
+ String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
+ int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+ if (zkHostList != null) {
+ String[] splits = zkHostList.split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String split : splits) {
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ sb.append(',');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ }
+ String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
+ if (dataDir != null) {
+ properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+ }
+ String rmClassName = jobConf.get(
+ RevisionManager.REVISION_MGR_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
+ RevisionManager revisionManger = RevisionManagerFactory
+ .getRevisionManager(properties);
+ revisionManger.open();
+ return revisionManger;
+ }
+
+ static void closeRevisionManagerQuietly(RevisionManager rm) {
+ if (rm != null) {
+ try {
+ rm.close();
+ } catch (IOException e) {
+ LOG.warn("Error while trying to close revision manager", e);
+ }
+ }
+ }
+
+
+ static HCatTableSnapshot convertSnapshot(TableSnapshot hbaseSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+ HashMap<String, Long> revisionMap = new HashMap<String, Long>();
+
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ if(hcatHbaseColMap.containsKey(fSchema.getName())){
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ long revisionID = hbaseSnapshot.getRevision(colFamily);
+ revisionMap.put(fSchema.getName(), revisionID);
+ }
+ }
+
+ HCatTableSnapshot hcatSnapshot = new HCatTableSnapshot(
+ hcatTableInfo.getDatabaseName(), hcatTableInfo.getTableName(),revisionMap,hbaseSnapshot.getLatestRevision());
+ return hcatSnapshot;
+ }
+
+ static TableSnapshot convertSnapshot(HCatTableSnapshot hcatSnapshot,
+ HCatTableInfo hcatTableInfo) throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ Map<String, Long> revisionMap = new HashMap<String, Long>();
+ Map<String, String> hcatHbaseColMap = getHCatHBaseColumnMapping(hcatTableInfo);
+ for (HCatFieldSchema fSchema : hcatTableSchema.getFields()) {
+ String colFamily = hcatHbaseColMap.get(fSchema.getName());
+ if (hcatSnapshot.containsColumn(fSchema.getName())) {
+ long revision = hcatSnapshot.getRevision(fSchema.getName());
+ revisionMap.put(colFamily, revision);
+ }
+ }
+
+ String fullyQualifiedName = hcatSnapshot.getDatabaseName() + "."
+ + hcatSnapshot.getTableName();
+ return new TableSnapshot(fullyQualifiedName, revisionMap,hcatSnapshot.getLatestRevision());
+
+ }
+
+ /**
+ * Begins a transaction in the revision manager for the given table.
+ * @param qualifiedTableName Name of the table
+ * @param tableInfo HCat Table information
+ * @param jobConf Job Configuration
+ * @return The new transaction in revision manager
+ * @throws IOException
+ */
+ static Transaction beginWriteTransaction(String qualifiedTableName,
+ HCatTableInfo tableInfo, Configuration jobConf) throws IOException {
+ Transaction txn;
+ RevisionManager rm = null;
+ try {
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(jobConf);
+ String hBaseColumns = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+ String[] splits = hBaseColumns.split("[,:]");
+ Set<String> families = new HashSet<String>();
+ for (int i = 0; i < splits.length; i += 2) {
+ if (!splits[i].isEmpty())
+ families.add(splits[i]);
+ }
+ txn = rm.beginWriteTransaction(qualifiedTableName, new ArrayList<String>(families));
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+ }
+ return txn;
+ }
+
+ static Transaction getWriteTransaction(Configuration conf) throws IOException {
+ OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
+ .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
+ }
+
+ static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
+ OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
+ conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ }
+
+ /**
+ * Get the Revision number that will be assigned to this job's output data
+ * @param conf configuration of the job
+ * @return the revision number used
+ * @throws IOException
+ */
+ static long getOutputRevision(Configuration conf) throws IOException {
+ return getWriteTransaction(conf).getRevisionNumber();
+ }
+
+ private static Map<String, String> getHCatHBaseColumnMapping( HCatTableInfo hcatTableInfo)
+ throws IOException {
+
+ HCatSchema hcatTableSchema = hcatTableInfo.getDataColumns();
+ StorerInfo storeInfo = hcatTableInfo.getStorerInfo();
+ String hbaseColumnMapping = storeInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+
+ Map<String, String> hcatHbaseColMap = new HashMap<String, String>();
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ try {
+ HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies,
+ null, columnQualifiers, null);
+ } catch (SerDeException e) {
+ throw new IOException("Exception while converting snapshots.", e);
+ }
+
+ for (HCatFieldSchema column : hcatTableSchema.getFields()) {
+ int fieldPos = hcatTableSchema.getPosition(column.getName());
+ String colFamily = columnFamilies.get(fieldPos);
+ if (colFamily.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ hcatHbaseColMap.put(column.getName(), colFamily);
+ }
+ }
+
+ return hcatHbaseColMap;
+ }
+
+}