You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/03 03:56:14 UTC
svn commit: r1296572 - in /incubator/hcatalog/branches/branch-0.4: ./
src/java/org/apache/hcatalog/mapreduce/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/
storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
storage-drivers/h...
Author: toffer
Date: Sat Mar 3 03:56:13 2012
New Revision: 1296572
URL: http://svn.apache.org/viewvc?rev=1296572&view=rev
Log:
HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)
Added:
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
- copied unchanged from r1296568, incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java
Removed:
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java.broken
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java.broken
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java.broken
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseSerDeResultConverter.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestSnapshots.java.broken
Modified:
incubator/hcatalog/branches/branch-0.4/ (props changed)
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
Propchange: incubator/hcatalog/branches/branch-0.4/
------------------------------------------------------------------------------
svn:mergeinfo = /incubator/hcatalog/trunk:1296568
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Sat Mar 3 03:56:13 2012
@@ -21,6 +21,8 @@ Apache HCatalog Change Log
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
+ HCAT-252 Rework HBase storage driver into HBase storage handler (rohini via toffer)
+
HCAT-265 remove deprecated HCatStorageHandler (toffer)
HCAT-239. Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers (vikram.dixit via gates)
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Sat Mar 3 03:56:13 2012
@@ -20,11 +20,12 @@ package org.apache.hcatalog.mapreduce;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.mapred.HCatMapRedUtil;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hcatalog.common.HCatConstants;
@@ -36,6 +37,8 @@ import org.apache.hcatalog.common.HCatUt
*/
class DefaultOutputCommitterContainer extends OutputCommitterContainer {
+ private static final Log LOG = LogFactory.getLog(DefaultOutputCommitterContainer.class);
+
/**
* @param context current JobContext
* @param baseCommitter OutputCommitter to contain
@@ -86,11 +89,9 @@ class DefaultOutputCommitterContainer ex
public void cleanupJob(JobContext context) throws IOException {
getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
- OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
-
//Cancel HCat and JobTracker tokens
try {
- HiveConf hiveConf = HCatUtil.getHiveConf(null,
+ HiveConf hiveConf = HCatUtil.getHiveConf(null,
context.getConfiguration());
HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
String tokenStrForm = client.getTokenStrForm();
@@ -98,7 +99,7 @@ class DefaultOutputCommitterContainer ex
client.cancelDelegationToken(tokenStrForm);
}
} catch (Exception e) {
- throw new IOException("Failed to cancel delegation token",e);
+ LOG.warn("Failed to cancel delegation token", e);
}
}
}
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Sat Mar 3 03:56:13 2012
@@ -30,6 +30,7 @@ import org.apache.hcatalog.common.HCatUt
import org.apache.hcatalog.data.HCatRecord;
import java.io.IOException;
+import java.text.NumberFormat;
/**
* Bare bones implementation of OutputFormatContainer. Does only the required
@@ -38,10 +39,20 @@ import java.io.IOException;
*/
class DefaultOutputFormatContainer extends OutputFormatContainer {
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
super(of);
}
+ static synchronized String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
/**
* Get the record writer for the job. Uses the Table's default OutputStorageDriver
@@ -53,8 +64,9 @@ class DefaultOutputFormatContainer exten
@Override
public RecordWriter<WritableComparable<?>, HCatRecord>
getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ String name = getOutputName(context.getTaskAttemptID().getTaskID().getId());
return new DefaultRecordWriterContainer(context,
- getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context)));
+ getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()), name, InternalUtil.createReporter(context)));
}
Modified: incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Sat Mar 3 03:56:13 2012
@@ -72,7 +72,7 @@ class DefaultRecordWriterContainer exten
public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
InterruptedException {
try {
- getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI));
+ getBaseRecordWriter().write(null, serDe.serialize(value.getAll(), hcatRecordOI));
} catch (SerDeException e) {
throw new IOException("Failed to serialize object",e);
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Sat Mar 3 03:56:13 2012
@@ -1,160 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hcatalog.hbase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-
-import java.io.IOException;
/**
- * Class which imports data into HBase via it's "bulk load" feature. Wherein regions
- * are created by the MR job using HFileOutputFormat and then later "moved" into
- * the appropriate region server.
+ * Class which imports data into HBase via it's "bulk load" feature. Wherein
+ * regions are created by the MR job using HFileOutputFormat and then later
+ * "moved" into the appropriate region server.
*/
-class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
- private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new byte[0]);
- private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
- private final static Log LOG = LogFactory.getLog(HBaseBulkOutputFormat.class);
+class HBaseBulkOutputFormat extends HBaseBaseOutputFormat {
+
+ private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(
+ new byte[0]);
+ private SequenceFileOutputFormat<WritableComparable<?>, Put> baseOutputFormat;
public HBaseBulkOutputFormat() {
- baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
+ baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>, Put>();
}
@Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- baseOutputFormat.checkOutputSpecs(context);
- //Get jobTracker delegation token if security is enabled
- //we need to launch the ImportSequenceFile job
- if(context.getConfiguration().getBoolean("hadoop.security.authorization",false)) {
- JobClient jobClient = new JobClient(new JobConf(context.getConfiguration()));
- context.getCredentials().addToken(new Text("my mr token"), jobClient.getDelegationToken(null));
- }
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ job.setOutputKeyClass(ImmutableBytesWritable.class);
+ job.setOutputValueClass(Put.class);
+ job.setOutputCommitter(HBaseBulkOutputCommitter.class);
+ baseOutputFormat.checkOutputSpecs(ignored, job);
+ getJTDelegationToken(job);
}
@Override
- public RecordWriter<WritableComparable<?>, Put> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- //TODO use a constant/static setter when available
- context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
- context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
- return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+ public RecordWriter<WritableComparable<?>, Put> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException {
+ long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+ return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(
+ ignored, job, name, progress), version);
}
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
- return new HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
+ private void getJTDelegationToken(JobConf job) throws IOException {
+ // Get jobTracker delegation token if security is enabled
+ // we need to launch the ImportSequenceFile job
+ if (job.getBoolean("hadoop.security.authorization", false)) {
+ JobClient jobClient = new JobClient(new JobConf(job));
+ try {
+ job.getCredentials().addToken(new Text("my mr token"),
+ jobClient.getDelegationToken(null));
+ } catch (InterruptedException e) {
+ throw new IOException("Error while getting JT delegation token", e);
+ }
+ }
}
- private static class HBaseBulkRecordWriter extends RecordWriter<WritableComparable<?>,Put> {
- private RecordWriter<WritableComparable<?>,Put> baseWriter;
+ private static class HBaseBulkRecordWriter implements
+ RecordWriter<WritableComparable<?>, Put> {
- public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put> baseWriter) {
+ private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private final Long outputVersion;
+
+ public HBaseBulkRecordWriter(
+ RecordWriter<WritableComparable<?>, Put> baseWriter,
+ Long outputVersion) {
this.baseWriter = baseWriter;
+ this.outputVersion = outputVersion;
}
@Override
- public void write(WritableComparable<?> key, Put value) throws IOException, InterruptedException {
- //we ignore the key
- baseWriter.write(EMPTY_LIST, value);
+ public void write(WritableComparable<?> key, Put value)
+ throws IOException {
+ Put put = value;
+ if (outputVersion != null) {
+ put = new Put(value.getRow(), outputVersion.longValue());
+ for (List<KeyValue> row : value.getFamilyMap().values()) {
+ for (KeyValue el : row) {
+ put.add(el.getFamily(), el.getQualifier(), el.getValue());
+ }
+ }
+ }
+ // we ignore the key
+ baseWriter.write(EMPTY_LIST, put);
}
@Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- baseWriter.close(context);
+ public void close(Reporter reporter) throws IOException {
+ baseWriter.close(reporter);
}
}
- private static class HBaseBulkOutputCommitter extends OutputCommitter {
- private OutputCommitter baseOutputCommitter;
+ public static class HBaseBulkOutputCommitter extends OutputCommitter {
+
+ private final OutputCommitter baseOutputCommitter;
- public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
- this.baseOutputCommitter = baseOutputCommitter;
+ public HBaseBulkOutputCommitter() {
+ baseOutputCommitter = new FileOutputCommitter();
}
@Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.abortTask(context);
+ public void abortTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.abortTask(taskContext);
}
@Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.commitTask(context);
+ public void commitTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.commitTask(taskContext);
}
@Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return baseOutputCommitter.needsTaskCommit(context);
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return baseOutputCommitter.needsTaskCommit(taskContext);
}
@Override
- public void setupJob(JobContext context) throws IOException {
- baseOutputCommitter.setupJob(context);
+ public void setupJob(JobContext jobContext) throws IOException {
+ baseOutputCommitter.setupJob(jobContext);
}
@Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.setupTask(context);
+ public void setupTask(TaskAttemptContext taskContext)
+ throws IOException {
+ baseOutputCommitter.setupTask(taskContext);
}
@Override
- public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ baseOutputCommitter.abortJob(jobContext, status);
RevisionManager rm = null;
try {
- baseOutputCommitter.abortJob(jobContext,state);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ rm.abortWriteTransaction(HBaseRevisionManagerUtil
+ .getWriteTransaction(jobContext.getConfiguration()));
} finally {
cleanIntermediate(jobContext);
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
+ baseOutputCommitter.commitJob(jobContext);
RevisionManager rm = null;
try {
- baseOutputCommitter.commitJob(jobContext);
Configuration conf = jobContext.getConfiguration();
- Path srcPath = FileOutputFormat.getOutputPath(jobContext);
- Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
+ Path srcPath = FileOutputFormat.getOutputPath(jobContext.getJobConf());
+ Path destPath = new Path(srcPath.getParent(), srcPath.getName() + "_hfiles");
ImportSequenceFile.runJob(jobContext,
- conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
- srcPath,
- destPath);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+ srcPath,
+ destPath);
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(conf));
cleanIntermediate(jobContext);
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
- public void cleanIntermediate(JobContext jobContext) throws IOException {
+ private void cleanIntermediate(JobContext jobContext)
+ throws IOException {
FileSystem fs = FileSystem.get(jobContext.getConfiguration());
- fs.delete(FileOutputFormat.getOutputPath(jobContext),true);
+ fs.delete(FileOutputFormat.getOutputPath(jobContext.getJobConf()), true);
}
}
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputFormat.java Sat Mar 3 03:56:13 2012
@@ -18,104 +18,135 @@
package org.apache.hcatalog.hbase;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
-import org.apache.hadoop.io.Writable;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-
-
-import java.io.IOException;
+import org.apache.hcatalog.hbase.snapshot.Transaction;
/**
- * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put API to write each row to HBase one a
- * time. Presently it is just using TableOutputFormat as the underlying implementation in the future we can
- * tune this to make the writes faster such as permanently disabling WAL, caching, etc.
+ * "Direct" implementation of OutputFormat for HBase. Uses HTable client's put
+ * API to write each row to HBase one a time. Presently it is just using
+ * TableOutputFormat as the underlying implementation in the future we can tune
+ * this to make the writes faster such as permanently disabling WAL, caching,
+ * etc.
*/
-class HBaseDirectOutputFormat extends OutputFormat<WritableComparable<?>,Writable> implements Configurable {
+class HBaseDirectOutputFormat extends HBaseBaseOutputFormat {
- private TableOutputFormat<WritableComparable<?>> outputFormat;
+ private TableOutputFormat outputFormat;
public HBaseDirectOutputFormat() {
- this.outputFormat = new TableOutputFormat<WritableComparable<?>>();
+ this.outputFormat = new TableOutputFormat();
}
@Override
- public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- return outputFormat.getRecordWriter(context);
+ public RecordWriter<WritableComparable<?>, Put> getRecordWriter(FileSystem ignored,
+ JobConf job, String name, Progressable progress)
+ throws IOException {
+ long version = HBaseRevisionManagerUtil.getOutputRevision(job);
+ return new HBaseDirectRecordWriter(outputFormat.getRecordWriter(ignored, job, name,
+ progress), version);
}
@Override
- public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
- outputFormat.checkOutputSpecs(context);
- }
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
+ throws IOException {
+ job.setOutputCommitter(HBaseDirectOutputCommitter.class);
+ job.setIfUnset(TableOutputFormat.OUTPUT_TABLE,
+ job.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY));
+ outputFormat.checkOutputSpecs(ignored, job);
+ }
+
+ private static class HBaseDirectRecordWriter implements
+ RecordWriter<WritableComparable<?>, Put> {
+
+ private RecordWriter<WritableComparable<?>, Put> baseWriter;
+ private final Long outputVersion;
+
+ public HBaseDirectRecordWriter(
+ RecordWriter<WritableComparable<?>, Put> baseWriter,
+ Long outputVersion) {
+ this.baseWriter = baseWriter;
+ this.outputVersion = outputVersion;
+ }
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new HBaseDirectOutputCommitter(outputFormat.getOutputCommitter(context));
- }
+ @Override
+ public void write(WritableComparable<?> key, Put value)
+ throws IOException {
+ Put put = value;
+ if (outputVersion != null) {
+ put = new Put(value.getRow(), outputVersion.longValue());
+ for (List<KeyValue> row : value.getFamilyMap().values()) {
+ for (KeyValue el : row) {
+ put.add(el.getFamily(), el.getQualifier(), el.getValue());
+ }
+ }
+ }
+ baseWriter.write(key, put);
+ }
- @Override
- public void setConf(Configuration conf) {
- String tableName = conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY);
- conf = new Configuration(conf);
- conf.set(TableOutputFormat.OUTPUT_TABLE,tableName);
- outputFormat.setConf(conf);
- }
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ baseWriter.close(reporter);
+ }
- @Override
- public Configuration getConf() {
- return outputFormat.getConf();
}
- private static class HBaseDirectOutputCommitter extends OutputCommitter {
- private OutputCommitter baseOutputCommitter;
+ public static class HBaseDirectOutputCommitter extends OutputCommitter {
- public HBaseDirectOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException {
- this.baseOutputCommitter = baseOutputCommitter;
+ public HBaseDirectOutputCommitter() throws IOException {
}
@Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.abortTask(context);
+ public void abortTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.commitTask(context);
+ public void commitTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return baseOutputCommitter.needsTaskCommit(context);
+ public boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException {
+ return false;
}
@Override
- public void setupJob(JobContext context) throws IOException {
- baseOutputCommitter.setupJob(context);
+ public void setupJob(JobContext jobContext) throws IOException {
}
@Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- baseOutputCommitter.setupTask(context);
+ public void setupTask(TaskAttemptContext taskContext)
+ throws IOException {
}
@Override
- public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+ public void abortJob(JobContext jobContext, int status)
+ throws IOException {
+ super.abortJob(jobContext, status);
RevisionManager rm = null;
try {
- baseOutputCommitter.abortJob(jobContext, state);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.abortWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ Transaction writeTransaction = HBaseRevisionManagerUtil
+ .getWriteTransaction(jobContext.getConfiguration());
+ rm.abortWriteTransaction(writeTransaction);
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
@@ -124,11 +155,12 @@ class HBaseDirectOutputFormat extends Ou
public void commitJob(JobContext jobContext) throws IOException {
RevisionManager rm = null;
try {
- baseOutputCommitter.commitJob(jobContext);
- rm = HBaseHCatStorageHandler.getOpenedRevisionManager(jobContext.getConfiguration());
- rm.commitWriteTransaction(HBaseHCatStorageHandler.getWriteTransaction(jobContext.getConfiguration()));
+ rm = HBaseRevisionManagerUtil
+ .getOpenedRevisionManager(jobContext.getConfiguration());
+ rm.commitWriteTransaction(HBaseRevisionManagerUtil.getWriteTransaction(jobContext
+ .getConfiguration()));
} finally {
- if(rm != null)
+ if (rm != null)
rm.close();
}
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseHCatStorageHandler.java Sat Mar 3 03:56:13 2012
@@ -19,22 +19,24 @@
package org.apache.hcatalog.hbase;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
@@ -50,13 +52,11 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.hbase.snapshot.RevisionManager;
-import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
-import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
@@ -74,22 +74,93 @@ import com.facebook.fb303.FacebookBase;
* tables through HCatalog. The implementation is very similar to the
* HiveHBaseStorageHandler, with more details to suit HCatalog.
*/
-public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook {
+//TODO remove serializable when HCATALOG-282 is fixed
+public class HBaseHCatStorageHandler extends HCatStorageHandler implements HiveMetaHook, Serializable {
- final static public String DEFAULT_PREFIX = "default.";
+ public final static String DEFAULT_PREFIX = "default.";
+ private final static String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
- private Configuration hbaseConf;
-
- private HBaseAdmin admin;
+ private transient Configuration hbaseConf;
+ private transient HBaseAdmin admin;
@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- //TODO complete rework and fill this in
+ // Populate jobProperties with input table name, table columns, RM snapshot,
+ // hbase-default.xml and hbase-site.xml
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ try {
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = inputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+ jobProperties.put(TableInputFormat.INPUT_TABLE, qualifiedTableName);
+
+ Configuration jobConf = getConf();
+ String outputSchema = jobConf.get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+ jobProperties.put(TableInputFormat.SCAN_COLUMNS, getScanColumns(tableInfo, outputSchema));
+
+ String serSnapshot = (String) inputJobInfo.getProperties().get(
+ HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ if (serSnapshot == null) {
+ Configuration conf = addHbaseResources(jobConf);
+ HCatTableSnapshot snapshot = HBaseRevisionManagerUtil.createSnapshot(conf,
+ qualifiedTableName, tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY,
+ HCatUtil.serialize(snapshot));
+ }
+
+ addHbaseResources(jobConf, jobProperties);
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
}
@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
- //TODO complete rework and fill this in
+ // Populate jobProperties with output table name, hbase-default.xml, hbase-site.xml, OutputJobInfo
+ // Populate RM transaction in OutputJobInfo
+ // In case of bulk mode, populate intermediate output location
+ Map<String, String> tableJobProperties = tableDesc.getJobProperties();
+ String jobString = tableJobProperties.get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+ try {
+ OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+ HCatTableInfo tableInfo = outputJobInfo.getTableInfo();
+ String qualifiedTableName = HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo);
+ jobProperties.put(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, qualifiedTableName);
+
+ Configuration jobConf = getConf();
+ String txnString = outputJobInfo.getProperties().getProperty(
+ HBaseConstants.PROPERTY_WRITE_TXN_KEY);
+ if (txnString == null) {
+ Configuration conf = addHbaseResources(jobConf);
+ Transaction txn = HBaseRevisionManagerUtil.beginWriteTransaction(qualifiedTableName, tableInfo, conf);
+ outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY,
+ HCatUtil.serialize(txn));
+
+ if (isBulkMode(outputJobInfo) && !(outputJobInfo.getProperties()
+ .containsKey(PROPERTY_INT_OUTPUT_LOCATION))) {
+ String tableLocation = tableInfo.getTableLocation();
+ String location = new Path(tableLocation, "REVISION_" + txn.getRevisionNumber())
+ .toString();
+ outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION,
+ location);
+ // We are writing out an intermediate sequenceFile hence
+ // location is not passed in OutputJobInfo.getLocation()
+ // TODO replace this with a mapreduce constant when available
+ jobProperties.put("mapred.output.dir", location);
+ }
+ }
+
+ jobProperties
+ .put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+ addHbaseResources(jobConf, jobProperties);
+ addOutputDependencyJars(jobConf);
+ jobProperties.put("tmpjars", jobConf.get("tmpjars"));
+
+ } catch (IOException e) {
+ throw new IllegalStateException("Error while configuring job properties", e);
+ }
}
/*
@@ -231,7 +302,7 @@ public class HBaseHCatStorageHandler ext
new HTable(hbaseConf, tableDesc.getName());
//Set up znodes in revision manager.
- RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+ RevisionManager rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
if (rm instanceof ZKBasedRevisionManager) {
ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
zkRM.setUpZNodes(tableName, new ArrayList<String>(
@@ -295,36 +366,6 @@ public class HBaseHCatStorageHandler ext
return this;
}
-//TODO finish rework remove this
-// /*
-// * @param tableDesc
-// *
-// * @param jobProperties
-// *
-// * @see org.apache.hcatalog.storagehandler.HCatStorageHandler
-// * #configureTableJobProperties(org.apache.hadoop.hive.ql.plan.TableDesc,
-// * java.util.Map)
-// */
-// @Override
-// public void configureTableJobProperties(TableDesc tableDesc,
-// Map<String, String> jobProperties) {
-// Properties tableProperties = tableDesc.getProperties();
-//
-// jobProperties.put(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-// tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING));
-//
-// String tableName = tableProperties
-// .getProperty(HBaseSerDe.HBASE_TABLE_NAME);
-// if (tableName == null) {
-// tableName = tableProperties.getProperty(Constants.META_TABLE_NAME);
-// if (tableName.startsWith(DEFAULT_PREFIX)) {
-// tableName = tableName.substring(DEFAULT_PREFIX.length());
-// }
-// }
-// jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
-//
-// }
-
private HBaseAdmin getHBaseAdmin() throws MetaException {
try {
if (admin == null) {
@@ -356,14 +397,12 @@ public class HBaseHCatStorageHandler ext
@Override
public Class<? extends InputFormat> getInputFormatClass() {
- //TODO replace this with rework
- return InputFormat.class;
+ return HBaseInputFormat.class;
}
@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
- //TODO replace this with rework
- return SequenceFileOutputFormat.class;
+ return HBaseBaseOutputFormat.class;
}
/*
@@ -397,6 +436,7 @@ public class HBaseHCatStorageHandler ext
private void checkDeleteTable(Table table) throws MetaException {
boolean isExternal = MetaStoreUtils.isExternalTable(table);
String tableName = getHBaseTableName(table);
+ RevisionManager rm = null;
try {
if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
// we have created an HBase table, so we delete it to roll back;
@@ -406,7 +446,7 @@ public class HBaseHCatStorageHandler ext
getHBaseAdmin().deleteTable(tableName);
//Set up znodes in revision manager.
- RevisionManager rm = getOpenedRevisionManager(hbaseConf);
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(hbaseConf);
if (rm instanceof ZKBasedRevisionManager) {
ZKBasedRevisionManager zkRM = (ZKBasedRevisionManager) rm;
zkRM.deleteZNodes(tableName);
@@ -414,6 +454,8 @@ public class HBaseHCatStorageHandler ext
}
} catch (IOException ie) {
throw new MetaException(StringUtils.stringifyException(ie));
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
}
}
@@ -436,9 +478,7 @@ public class HBaseHCatStorageHandler ext
* @param conf
* @throws IOException
*/
- public static void addDependencyJars(Configuration conf) throws IOException {
- //TODO provide a facility/interface for loading/specifying dependencies
- //Ideally this method shouldn't be exposed to the user
+ private void addOutputDependencyJars(Configuration conf) throws IOException {
TableMapReduceUtil.addDependencyJars(conf,
//hadoop-core
Writable.class,
@@ -452,8 +492,6 @@ public class HBaseHCatStorageHandler ext
HCatOutputFormat.class,
//hive hbase storage handler jar
HBaseSerDe.class,
- //hcat hbase storage driver jar
- HBaseOutputStorageDriver.class,
//hive jar
Table.class,
//libthrift jar
@@ -464,152 +502,86 @@ public class HBaseHCatStorageHandler ext
FacebookBase.class);
}
-
/**
- * Creates the latest snapshot of the table.
- *
- * @param jobConf The job configuration.
- * @param hbaseTableName The fully qualified name of the HBase table.
- * @return An instance of HCatTableSnapshot
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String hbaseTableName ) throws IOException {
-
- RevisionManager rm = null;
- TableSnapshot snpt;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(hbaseTableName);
- } finally {
- if (rm != null)
- rm.close();
- }
-
- String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
- if(inputJobString == null){
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
- }
- InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
- HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
- .convertSnapshot(snpt, inputInfo.getTableInfo());
-
- return hcatSnapshot;
+ * Utility method to get a new Configuration with hbase-default.xml and hbase-site.xml added
+ * @param jobConf existing configuration
+ * @return a new Configuration with hbase-default.xml and hbase-site.xml added
+ */
+ private Configuration addHbaseResources(Configuration jobConf) {
+ Configuration conf = new Configuration(jobConf);
+ HBaseConfiguration.addHbaseResources(conf);
+ return conf;
}
/**
- * Creates the snapshot using the revision specified by the user.
- *
- * @param jobConf The job configuration.
- * @param tableName The fully qualified name of the table whose snapshot is being taken.
- * @param revision The revision number to use for the snapshot.
- * @return An instance of HCatTableSnapshot.
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static HCatTableSnapshot createSnapshot(Configuration jobConf,
- String tableName, long revision)
- throws IOException {
-
- TableSnapshot snpt;
- RevisionManager rm = null;
- try {
- rm = getOpenedRevisionManager(jobConf);
- snpt = rm.createSnapshot(tableName, revision);
- } finally {
- if (rm != null)
- rm.close();
- }
-
- String inputJobString = jobConf.get(HCatConstants.HCAT_KEY_JOB_INFO);
- if(inputJobString == null){
- throw new IOException(
- "InputJobInfo information not found in JobContext. "
- + "HCatInputFormat.setInput() not called?");
+ * Utility method to add hbase-default.xml and hbase-site.xml properties to a new map
+ * if they are not already present in the jobConf.
+ * @param jobConf Job configuration
+ * @param newJobProperties Map to which new properties should be added
+ */
+ private void addHbaseResources(Configuration jobConf,
+ Map<String, String> newJobProperties) {
+ Configuration conf = new Configuration(false);
+ HBaseConfiguration.addHbaseResources(conf);
+ for (Entry<String, String> entry : conf) {
+ if (jobConf.get(entry.getKey()) == null)
+ newJobProperties.put(entry.getKey(), entry.getValue());
}
- InputJobInfo inputInfo = (InputJobInfo) HCatUtil.deserialize(inputJobString);
- HCatTableSnapshot hcatSnapshot = HBaseInputStorageDriver
- .convertSnapshot(snpt, inputInfo.getTableInfo());
-
- return hcatSnapshot;
}
- /**
- * Gets an instance of revision manager which is opened.
- *
- * @param jobConf The job configuration.
- * @return RevisionManager An instance of revision manager.
- * @throws IOException
- */
- static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
- Properties properties = new Properties();
- String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
- int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
- if (zkHostList != null) {
- String[] splits = zkHostList.split(",");
- StringBuffer sb = new StringBuffer();
- for (String split : splits) {
- sb.append(split);
- sb.append(':');
- sb.append(port);
- sb.append(',');
- }
-
- sb.deleteCharAt(sb.length() - 1);
- properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
- }
- String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
- if (dataDir != null) {
- properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
- }
- String rmClassName = jobConf.get(
- RevisionManager.REVISION_MGR_IMPL_CLASS,
- ZKBasedRevisionManager.class.getName());
- properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
- RevisionManager revisionManger = RevisionManagerFactory
- .getRevisionManager(properties);
- revisionManger.open();
- return revisionManger;
- }
-
- /**
- * Set snapshot as a property.
- *
- * @param snapshot The HCatTableSnapshot to be passed to the job.
- * @param inpJobInfo The InputJobInfo for the job.
- * @throws IOException
- */
- public void setSnapshot(HCatTableSnapshot snapshot, InputJobInfo inpJobInfo)
- throws IOException {
- String serializedSnp = HCatUtil.serialize(snapshot);
- inpJobInfo.getProperties().setProperty(
- HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY, serializedSnp);
- }
-
- static Transaction getWriteTransaction(Configuration conf) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- return (Transaction) HCatUtil.deserialize(outputJobInfo.getProperties()
- .getProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY));
- }
-
- static void setWriteTransaction(Configuration conf, Transaction txn) throws IOException {
- OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
- outputJobInfo.getProperties().setProperty(HBaseConstants.PROPERTY_WRITE_TXN_KEY, HCatUtil.serialize(txn));
- conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
- }
-
- /**
- * Get the Revision number that will be assigned to this job's output data
- * @param conf configuration of the job
- * @return the revision number used
- * @throws IOException
- */
- public static long getOutputRevision(Configuration conf) throws IOException {
- return getWriteTransaction(conf).getRevisionNumber();
+ public static boolean isBulkMode(OutputJobInfo outputJobInfo) {
+ //Default is false
+ String bulkMode = outputJobInfo.getTableInfo().getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY,
+ "false");
+ return "true".equals(bulkMode);
+ }
+
+ private String getScanColumns(HCatTableInfo tableInfo, String outputColSchema) throws IOException {
+ StringBuilder builder = new StringBuilder();
+ String hbaseColumnMapping = tableInfo.getStorerInfo().getProperties()
+ .getProperty(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY);
+ if (outputColSchema == null) {
+ String[] splits = hbaseColumnMapping.split("[,]");
+ for (int i = 0; i < splits.length; i++) {
+ if (!splits[i].equals(HBaseSerDe.HBASE_KEY_COL))
+ builder.append(splits[i]).append(" ");
+ }
+ } else {
+ HCatSchema outputSchema = (HCatSchema) HCatUtil.deserialize(outputColSchema);
+ HCatSchema tableSchema = tableInfo.getDataColumns();
+ List<String> outputFieldNames = outputSchema.getFieldNames();
+ List<Integer> outputColumnMapping = new ArrayList<Integer>();
+ for(String fieldName: outputFieldNames){
+ int position = tableSchema.getPosition(fieldName);
+ outputColumnMapping.add(position);
+ }
+ try {
+ List<String> columnFamilies = new ArrayList<String>();
+ List<String> columnQualifiers = new ArrayList<String>();
+ HBaseSerDe.parseColumnMapping(hbaseColumnMapping, columnFamilies, null,
+ columnQualifiers, null);
+ for (int i = 0; i < outputColumnMapping.size(); i++) {
+ int cfIndex = outputColumnMapping.get(i);
+ String cf = columnFamilies.get(cfIndex);
+ // We skip the key column.
+ if (cf.equals(HBaseSerDe.HBASE_KEY_COL) == false) {
+ String qualifier = columnQualifiers.get(i);
+ builder.append(cf);
+ builder.append(":");
+ if (qualifier != null) {
+ builder.append(qualifier);
+ }
+ builder.append(" ");
+ }
+ }
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ }
+ //Remove the extra space delimiter
+ builder.deleteCharAt(builder.length() - 1);
+ return builder.toString();
}
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java Sat Mar 3 03:56:13 2012
@@ -21,33 +21,31 @@ package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableSplit;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
-import org.apache.hadoop.hbase.mapreduce.TableSplit;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.mapreduce.InputJobInfo;
/**
* This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
*/
-class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> implements Configurable{
+class HBaseInputFormat implements InputFormat<ImmutableBytesWritable, Result> {
private final TableInputFormat inputFormat;
- private final InputJobInfo jobInfo;
- private Configuration conf;
- public HBaseInputFormat(InputJobInfo jobInfo) {
+ public HBaseInputFormat() {
inputFormat = new TableInputFormat();
- this.jobInfo = jobInfo;
}
/*
@@ -67,20 +65,27 @@ class HBaseInputFormat extends InputForm
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
- public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
- InputSplit split, TaskAttemptContext tac) throws IOException,
- InterruptedException {
-
- String tableName = inputFormat.getConf().get(TableInputFormat.INPUT_TABLE);
- TableSplit tSplit = (TableSplit) split;
- HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(jobInfo);
- Scan sc = new Scan(inputFormat.getScan());
- sc.setStartRow(tSplit.getStartRow());
- sc.setStopRow(tSplit.getEndRow());
- recordReader.setScan(sc);
- recordReader.setHTable(new HTable(this.conf, tableName));
- recordReader.init();
- return recordReader;
+ public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
+ InputSplit split, JobConf job, Reporter reporter)
+ throws IOException {
+ String jobString = job.get(HCatConstants.HCAT_KEY_JOB_INFO);
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobString);
+
+ String tableName = job.get(TableInputFormat.INPUT_TABLE);
+ TableSplit tSplit = (TableSplit) split;
+ HbaseSnapshotRecordReader recordReader = new HbaseSnapshotRecordReader(inputJobInfo, job);
+ inputFormat.setConf(job);
+ Scan inputScan = inputFormat.getScan();
+ // TODO: Make the caching configurable by the user
+ inputScan.setCaching(200);
+ inputScan.setCacheBlocks(false);
+ Scan sc = new Scan(inputScan);
+ sc.setStartRow(tSplit.getStartRow());
+ sc.setStopRow(tSplit.getEndRow());
+ recordReader.setScan(sc);
+ recordReader.setHTable(new HTable(job, tableName));
+ recordReader.init();
+ return recordReader;
}
/*
@@ -97,35 +102,24 @@ class HBaseInputFormat extends InputForm
* .JobContext)
*/
@Override
- public List<InputSplit> getSplits(JobContext jobContext)
- throws IOException, InterruptedException {
-
- String tableName = this.conf.get(TableInputFormat.INPUT_TABLE);
- if (tableName == null) {
- throw new IOException("The input table is not set. The input splits cannot be created.");
+ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplits)
+ throws IOException {
+ inputFormat.setConf(job);
+ return convertSplits(inputFormat.getSplits(HCatMapRedUtil.createJobContext(job, null,
+ Reporter.NULL)));
+ }
+
+ private InputSplit[] convertSplits(List<org.apache.hadoop.mapreduce.InputSplit> splits) {
+ InputSplit[] converted = new InputSplit[splits.size()];
+ for (int i = 0; i < splits.size(); i++) {
+ org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit =
+ (org.apache.hadoop.hbase.mapreduce.TableSplit) splits.get(i);
+ TableSplit newTableSplit = new TableSplit(tableSplit.getTableName(),
+ tableSplit.getStartRow(),
+ tableSplit.getEndRow(), tableSplit.getRegionLocation());
+ converted[i] = newTableSplit;
}
- return inputFormat.getSplits(jobContext);
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- inputFormat.setConf(conf);
- }
-
- public Scan getScan() {
- return inputFormat.getScan();
- }
-
- public void setScan(Scan scan) {
- inputFormat.setScan(scan);
- }
-
- /* @return
- * @see org.apache.hadoop.conf.Configurable#getConf()
- */
- @Override
- public Configuration getConf() {
- return this.conf;
+ return converted;
}
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HbaseSnapshotRecordReader.java Sat Mar 3 03:56:13 2012
@@ -20,6 +20,7 @@ package org.apache.hcatalog.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -27,15 +28,20 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.RecordReader;
import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.hbase.snapshot.FamilyRevision;
+import org.apache.hcatalog.hbase.snapshot.RevisionManager;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.mapreduce.InputJobInfo;
@@ -43,59 +49,93 @@ import org.apache.hcatalog.mapreduce.Inp
* The Class HbaseSnapshotRecordReader implements logic for filtering records
* based on snapshot.
*/
-class HbaseSnapshotRecordReader extends TableRecordReader {
+class HbaseSnapshotRecordReader implements RecordReader<ImmutableBytesWritable, Result> {
static final Log LOG = LogFactory.getLog(HbaseSnapshotRecordReader.class);
+ private final InputJobInfo inpJobInfo;
+ private final Configuration conf;
+ private final int maxRevisions = 1;
private ResultScanner scanner;
private Scan scan;
private HTable htable;
- private ImmutableBytesWritable key;
- private Result value;
- private InputJobInfo inpJobInfo;
private TableSnapshot snapshot;
- private int maxRevisions;
private Iterator<Result> resultItr;
+ private Set<Long> allAbortedTransactions;
+ private DataOutputBuffer valueOut = new DataOutputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
-
- HbaseSnapshotRecordReader(InputJobInfo inputJobInfo) throws IOException {
+ HbaseSnapshotRecordReader(InputJobInfo inputJobInfo, Configuration conf) throws IOException {
this.inpJobInfo = inputJobInfo;
- String snapshotString = inpJobInfo.getProperties().getProperty(
- HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
+ this.conf = conf;
+ String snapshotString = conf.get(HBaseConstants.PROPERTY_TABLE_SNAPSHOT_KEY);
HCatTableSnapshot hcatSnapshot = (HCatTableSnapshot) HCatUtil
.deserialize(snapshotString);
- this.snapshot = HBaseInputStorageDriver.convertSnapshot(hcatSnapshot,
+ this.snapshot = HBaseRevisionManagerUtil.convertSnapshot(hcatSnapshot,
inpJobInfo.getTableInfo());
- this.maxRevisions = 1;
}
- /* @param firstRow The first record in the split.
- /* @throws IOException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#restart(byte[])
- */
- @Override
+ public void init() throws IOException {
+ restart(scan.getStartRow());
+ }
+
public void restart(byte[] firstRow) throws IOException {
+ allAbortedTransactions = getAbortedTransactions(Bytes.toString(htable.getTableName()), scan);
+ long maxValidRevision = snapshot.getLatestRevision();
+ while (allAbortedTransactions.contains(maxValidRevision)) {
+ maxValidRevision--;
+ }
+ long minValidRevision = getMinimumRevision(scan, snapshot);
+ while (allAbortedTransactions.contains(minValidRevision)) {
+ minValidRevision--;
+ }
Scan newScan = new Scan(scan);
newScan.setStartRow(firstRow);
+ //TODO: See if filters in 0.92 can be used to optimize the scan
+ //TODO: Consider create a custom snapshot filter
+ newScan.setTimeRange(minValidRevision, maxValidRevision + 1);
+ newScan.setMaxVersions();
this.scanner = this.htable.getScanner(newScan);
resultItr = this.scanner.iterator();
}
- /* @throws IOException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#init()
- */
- @Override
- public void init() throws IOException {
- restart(scan.getStartRow());
+ private Set<Long> getAbortedTransactions(String tableName, Scan scan) throws IOException {
+ Set<Long> abortedTransactions = new HashSet<Long>();
+ RevisionManager rm = null;
+ try {
+ rm = HBaseRevisionManagerUtil.getOpenedRevisionManager(conf);
+ byte[][] families = scan.getFamilies();
+ for (byte[] familyKey : families) {
+ String family = Bytes.toString(familyKey);
+ List<FamilyRevision> abortedWriteTransactions = rm.getAbortedWriteTransactions(
+ tableName, family);
+ if (abortedWriteTransactions != null) {
+ for (FamilyRevision revision : abortedWriteTransactions) {
+ abortedTransactions.add(revision.getRevision());
+ }
+ }
+ }
+ return abortedTransactions;
+ } finally {
+ HBaseRevisionManagerUtil.closeRevisionManagerQuietly(rm);
+ }
+ }
+
+ private long getMinimumRevision(Scan scan, TableSnapshot snapshot) {
+ long minRevision = snapshot.getLatestRevision();
+ byte[][] families = scan.getFamilies();
+ for (byte[] familyKey : families) {
+ String family = Bytes.toString(familyKey);
+ long revision = snapshot.getRevision(family);
+ if (revision < minRevision)
+ minRevision = revision;
+ }
+ return minRevision;
}
/*
* @param htable The HTable ( of HBase) to use for the record reader.
*
- * @see
- * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setHTable(org.apache
- * .hadoop.hbase.client.HTable)
*/
- @Override
public void setHTable(HTable htable) {
this.htable = htable;
}
@@ -103,64 +143,51 @@ class HbaseSnapshotRecordReader extends
/*
* @param scan The scan to be used for reading records.
*
- * @see
- * org.apache.hadoop.hbase.mapreduce.TableRecordReader#setScan(org.apache
- * .hadoop.hbase.client.Scan)
*/
- @Override
public void setScan(Scan scan) {
this.scan = scan;
}
- /*
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#close()
- */
@Override
- public void close() {
- this.resultItr = null;
- this.scanner.close();
+ public ImmutableBytesWritable createKey() {
+ return new ImmutableBytesWritable();
}
- /* @return The row of hbase record.
- /* @throws IOException
- /* @throws InterruptedException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentKey()
- */
@Override
- public ImmutableBytesWritable getCurrentKey() throws IOException,
- InterruptedException {
- return key;
+ public Result createValue() {
+ return new Result();
}
- /* @return Single row result of scan of HBase table.
- /* @throws IOException
- /* @throws InterruptedException
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getCurrentValue()
- */
@Override
- public Result getCurrentValue() throws IOException, InterruptedException {
- return value;
+ public long getPos() {
+ // This should be the ordinal tuple in the range;
+ // not clear how to calculate...
+ return 0;
}
- /* @return Returns whether a next key-value is available for reading.
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#nextKeyValue()
- */
@Override
- public boolean nextKeyValue() {
+ public float getProgress() throws IOException {
+ // Depends on the total number of tuples
+ return 0;
+ }
+ @Override
+ public boolean next(ImmutableBytesWritable key, Result value) throws IOException {
if (this.resultItr == null) {
LOG.warn("The HBase result iterator is found null. It is possible"
+ " that the record reader has already been closed.");
} else {
-
- if (key == null)
- key = new ImmutableBytesWritable();
while (resultItr.hasNext()) {
Result temp = resultItr.next();
Result hbaseRow = prepareResult(temp.list());
if (hbaseRow != null) {
+ // Update key and value. Currently no way to avoid serialization/de-serialization
+ // as no setters are available.
key.set(hbaseRow.getRow());
- value = hbaseRow;
+ valueOut.reset();
+ hbaseRow.write(valueOut);
+ valueIn.reset(valueOut.getData(), valueOut.getLength());
+ value.readFields(valueIn);
return true;
}
@@ -185,6 +212,11 @@ class HbaseSnapshotRecordReader extends
}
String family = Bytes.toString(kv.getFamily());
+ //Ignore aborted transactions
+ if (allAbortedTransactions.contains(kv.getTimestamp())) {
+ continue;
+ }
+
long desiredTS = snapshot.getRevision(family);
if (kv.getTimestamp() <= desiredTS) {
kvs.add(kv);
@@ -213,13 +245,13 @@ class HbaseSnapshotRecordReader extends
}
}
- /* @return The progress of the record reader.
- * @see org.apache.hadoop.hbase.mapreduce.TableRecordReader#getProgress()
+ /*
+ * @see org.apache.hadoop.hbase.mapred.TableRecordReader#close()
*/
@Override
- public float getProgress() {
- // Depends on the total number of tuples
- return 0;
+ public void close() {
+ this.resultItr = null;
+ this.scanner.close();
}
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/FamilyRevision.java Sat Mar 3 03:56:13 2012
@@ -25,7 +25,7 @@ package org.apache.hcatalog.hbase.snapsh
* family and stored in the corresponding znode. When a write transaction is
* committed, the transaction object is removed from the list.
*/
-class FamilyRevision implements
+public class FamilyRevision implements
Comparable<FamilyRevision> {
private long revision;
@@ -42,11 +42,11 @@ class FamilyRevision implements
this.timestamp = ts;
}
- long getRevision() {
+ public long getRevision() {
return revision;
}
- long getExpireTimestamp() {
+ public long getExpireTimestamp() {
return timestamp;
}
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Sat Mar 3 03:56:13 2012
@@ -89,6 +89,17 @@ public interface RevisionManager {
throws IOException;
/**
+ * Get the list of aborted Transactions for a column family
+ *
+ * @param table the table name
+ * @param columnFamily the column family name
+ * @return a list of aborted WriteTransactions
+ * @throws java.io.IOException
+ */
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException;
+
+ /**
* Create the latest snapshot of the table.
*
* @param tableName
Modified: incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java?rev=1296572&r1=1296571&r2=1296572&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java (original)
+++ incubator/hcatalog/branches/branch-0.4/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ZKBasedRevisionManager.java Sat Mar 3 03:56:13 2012
@@ -365,14 +365,8 @@ public class ZKBasedRevisionManager impl
return zkUtil.getTransactionList(path);
}
- /**
- * Get the list of aborted Transactions for a column family
- * @param table the table name
- * @param columnFamily the column family name
- * @return a list of aborted WriteTransactions
- * @throws java.io.IOException
- */
- List<FamilyRevision> getAbortedWriteTransactions(String table,
+ @Override
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
String columnFamily) throws IOException {
String path = PathUtil.getAbortInformationPath(baseDir, table, columnFamily);
return zkUtil.getTransactionList(path);