You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by yc...@apache.org on 2018/10/10 13:06:21 UTC
hive git commit: HIVE-20678: HiveHBaseTableOutputFormat should
implement HiveOutputFormat to ensure compatibility (Alice Fan and Yun Zhao,
reviewed by Yongzhi Chen)
Repository: hive
Updated Branches:
refs/heads/master 90e12280c -> 390afb5ef
HIVE-20678: HiveHBaseTableOutputFormat should implement HiveOutputFormat to ensure compatibility (Alice Fan and Yun Zhao, reviewed by Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/390afb5e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/390afb5e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/390afb5e
Branch: refs/heads/master
Commit: 390afb5ef5a778c8b4ef823d2bb0578f0a6d0660
Parents: 90e1228
Author: Yongzhi Chen <yc...@cloudera.com>
Authored: Wed Oct 10 09:03:18 2018 -0400
Committer: Yongzhi Chen <yc...@cloudera.com>
Committed: Wed Oct 10 09:03:18 2018 -0400
----------------------------------------------------------------------
.../hive/hbase/HiveHBaseTableOutputFormat.java | 64 +++++++++++---------
.../hbase/TestHiveHBaseTableOutputFormat.java | 38 ++++++++++++
2 files changed, 74 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/390afb5e/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
index 08576c2..b344e16 100644
--- a/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
+++ b/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
@@ -19,11 +19,16 @@
package org.apache.hadoop.hive.hbase;
import java.io.IOException;
+import java.util.Properties;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -37,7 +42,6 @@ import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -48,28 +52,12 @@ import org.apache.hadoop.security.UserGroupInformation;
/**
* HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
- * We also need to implement the @deprecated org.apache.hadoop.mapred.OutFormat<?,?>
- * class to keep it compliant with Hive interfaces.
*/
public class HiveHBaseTableOutputFormat extends
TableOutputFormat<ImmutableBytesWritable> implements
- OutputFormat<ImmutableBytesWritable, Object> {
+ HiveOutputFormat<ImmutableBytesWritable, Object> {
static final Logger LOG = LoggerFactory.getLogger(HiveHBaseTableOutputFormat.class);
- public static final String HBASE_WAL_ENABLED = "hive.hbase.wal.enabled";
-
- /**
- * Update the out table, and output an empty key as the key.
- *
- * @param jc the job configuration file
- * @param finalOutPath the final output table name
- * @param valueClass the value class
- * @param isCompressed whether the content is compressed or not
- * @param tableProperties the table info of the corresponding table
- * @param progress progress used for status report
- * @return the RecordWriter for the output file
- */
-
@Override
public void checkOutputSpecs(FileSystem fs, JobConf jc) throws IOException {
@@ -99,7 +87,23 @@ public class HiveHBaseTableOutputFormat extends
JobConf jobConf,
String name,
Progressable progressable) throws IOException {
+ return getMyRecordWriter(jobConf);
+ }
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ return new TableOutputCommitter();
+ }
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(
+ JobConf jobConf, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
+ Properties tableProperties, Progressable progress) throws IOException {
+ return getMyRecordWriter(jobConf);
+ }
+
+ private MyRecordWriter getMyRecordWriter(JobConf jobConf) throws IOException {
String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
jobConf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
final boolean walEnabled = HiveConf.getBoolVar(
@@ -109,14 +113,9 @@ public class HiveHBaseTableOutputFormat extends
return new MyRecordWriter(table, conn, walEnabled);
}
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
- InterruptedException {
- return new TableOutputCommitter();
- }
-
-
- static private class MyRecordWriter implements org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Object> {
+ private static class MyRecordWriter
+ implements org.apache.hadoop.mapred.RecordWriter<ImmutableBytesWritable, Object>,
+ org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
private final BufferedMutator m_table;
private final boolean m_walEnabled;
private final Connection m_connection;
@@ -127,8 +126,7 @@ public class HiveHBaseTableOutputFormat extends
m_connection = connection;
}
- public void close(Reporter reporter)
- throws IOException {
+ public void close(Reporter reporter) throws IOException {
m_table.close();
}
@@ -159,5 +157,15 @@ public class HiveHBaseTableOutputFormat extends
super.finalize();
}
}
+
+ @Override
+ public void write(Writable w) throws IOException {
+ write(null, w);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ close(null);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/390afb5e/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java
new file mode 100644
index 0000000..1bbcada
--- /dev/null
+++ b/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHiveHBaseTableOutputFormat.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import static org.junit.Assert.assertTrue;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * This is a simple test to make sure HiveHBaseTableOutputFormat implements HiveOutputFormat for HBase tables.
+ */
+public class TestHiveHBaseTableOutputFormat {
+
+ @Test
+ public void testInstanceOfHiveHBaseTableOutputFormat() {
+ HiveHBaseTableOutputFormat hBaseOutputFormat = Mockito.mock(HiveHBaseTableOutputFormat.class);
+ assertTrue(hBaseOutputFormat instanceof TableOutputFormat);
+ assertTrue(hBaseOutputFormat instanceof HiveOutputFormat);
+ }
+}