You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/03/23 17:27:43 UTC

[12/50] [abbrv] bigtop git commit: Progress so far. Doesn't work yet, but committing to avoid another data loss.

Progress so far.  Doesn't work yet, but committing to avoid another data loss.

(cherry picked from commit 18ee8453c11e7fd7c25af75e6c403753db11d5f5)


Branch: refs/heads/master
Commit: c313795409472dfffda49a4ffcb6dc6c59f9c5a9
Parents: 241c839
Author: Alan Gates <>
Authored: Mon Nov 7 15:39:54 2016 -0800
Committer: Roman Shaposhnik <>
Committed: Thu Mar 23 10:27:12 2017 -0700

 bigtop-tests/spec-tests/runtime/build.gradle    |   8 +-
 .../org/odpi/specs/runtime/hive/ | 124 ++++++++++
 .../odpi/specs/runtime/hive/  |   3 +
 .../odpi/specs/runtime/hive/   | 224 +++++++++++++++++++
 .../org/odpi/specs/runtime/hive/ |   2 +-
 5 files changed, 359 insertions(+), 2 deletions(-)
diff --git a/bigtop-tests/spec-tests/runtime/build.gradle b/bigtop-tests/spec-tests/runtime/build.gradle
index 5505550..f0166c9 100644
--- a/bigtop-tests/spec-tests/runtime/build.gradle
+++ b/bigtop-tests/spec-tests/runtime/build.gradle
@@ -17,6 +17,8 @@
 def junitVersion = '4.11'
+apply plugin: 'java'
 repositories {
   maven {
     url ""
@@ -31,8 +33,12 @@ dependencies {
   compile group: 'org.apache.hive', name: 'hive-common', version: '1.2.1'
   compile group: 'org.apache.thrift', name: 'libfb303', version: '0.9.3'
   compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
-  testCompile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2'
+  compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2'
+  compile group: 'org.apache.hive.hcatalog', name: 'hive-hcatalog-core', version: '1.2.1'
   testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.2'
+  compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-jobclient', version: '2.7.2'
+  testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-common', version: '2.7.2'
+  testCompile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.2'
   testCompile group: 'org.apache.hive', name: 'hive-exec', version: '1.2.1'
   if (System.env.HADOOP_CONF_DIR) testRuntime files(System.env.HADOOP_CONF_DIR)
diff --git a/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/ b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/
new file mode 100644
index 0000000..4a733d6
--- /dev/null
+++ b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/
@@ -0,0 +1,124 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.odpi.specs.runtime.hive;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import java.util.StringTokenizer;
+public class HCatalogMR extends Configured implements Tool {
+  private final static String INPUT_SCHEMA = "odpi.test.hcat.schema.input";
+  private final static String OUTPUT_SCHEMA = "odpi.test.hcat.schema.output";
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    args = new GenericOptionsParser(conf, args).getRemainingArgs();
+    String inputTable = args[0];
+    String outputTable = args[1];
+    String inputSchemaStr = args[2];
+    String outputSchemaStr = args[3];
+    conf.set(INPUT_SCHEMA, inputSchemaStr);
+    conf.set(OUTPUT_SCHEMA, outputSchemaStr);
+    Job job = new Job(conf, "odpi_hcat_test");
+    HCatInputFormat.setInput(job, "default", inputTable);
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setJarByClass(HCatalogMR.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputKeyClass(WritableComparable.class);
+    job.setOutputValueClass(HCatRecord.class);
+    HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null));
+    HCatOutputFormat.setSchema(job, HCatSchemaUtils.getHCatSchema(outputSchemaStr));
+    job.setOutputFormatClass(HCatOutputFormat.class);
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar"));
+    return job.waitForCompletion(true) ? 0 : 1;
+  }
+  public static class Map extends Mapper<WritableComparable,
+          HCatRecord, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    private HCatSchema inputSchema = null;
+    @Override
+    protected void map(WritableComparable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      if (inputSchema == null) {
+        inputSchema =
+            HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(INPUT_SCHEMA));
+      }
+      String line = value.getString("line", inputSchema);
+      StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+  public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
+    private HCatSchema outputSchema = null;
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
+        IOException, InterruptedException {
+      if (outputSchema == null) {
+        outputSchema =
+            HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(OUTPUT_SCHEMA));
+      }
+      int sum = 0;
+      for (IntWritable i : values) {
+        sum += i.get();
+      }
+      HCatRecord output = new DefaultHCatRecord(2);
+      output.set("word", outputSchema, key);
+      output.set("count", outputSchema, sum);
+      context.write(null, output);
+    }
+  }
+ }
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
index f5cc379..7512dab 100644
--- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
@@ -36,6 +36,9 @@ public class JdbcConnector {
   protected static final String LOCATION = "odpi.test.hive.location";
   protected static final String METASTORE_URL = "odpi.test.hive.metastore.url";
   protected static final String TEST_THRIFT = "odpi.test.hive.thrift.test";
+  protected static final String TEST_HCATALOG = "odpi.test.hive.hcatalog.test";
+  protected static final String HIVE_CONF_DIR = "odpi.test.hive.conf.dir";
+  protected static final String HADOOP_CONF_DIR = "odpi.test.hadoop.conf.dir";
   protected static Connection conn;
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
new file mode 100644
index 0000000..4b61131
--- /dev/null
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
@@ -0,0 +1,224 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.odpi.specs.runtime.hive;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+public class TestHCatalog {
+  private static final Log LOG = LogFactory.getLog(TestHCatalog.class.getName());
+  private static IMetaStoreClient client = null;
+  private static HiveConf conf;
+  private static HCatSchema inputSchema;
+  private static HCatSchema outputSchema;
+  private Random rand;
+  @BeforeClass
+  public static void connect() throws MetaException {
+    if (JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog ")) {
+      String hiveConfDir = JdbcConnector.getProperty(JdbcConnector.HIVE_CONF_DIR,
+          "Hive conf directory ");
+      String hadoopConfDir = JdbcConnector.getProperty(JdbcConnector.HADOOP_CONF_DIR,
+          "Hadoop conf directory ");
+      conf = new HiveConf();
+      String fileSep = System.getProperty("file.separator");
+      conf.addResource(new Path(hadoopConfDir + fileSep + "core-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "hdfs-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "yarn-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "mapred-site.xml"));
+      conf.addResource(new Path(hiveConfDir + fileSep + "hive-site.xml"));
+      client = new HiveMetaStoreClient(conf);
+    }
+  }
+  @Before
+  public void checkIfActive() {
+    Assume.assumeTrue(JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog "));
+    rand = new Random();
+  }
+  @Test
+  public void hcatInputFormatOutputFormat() throws TException, IOException, ClassNotFoundException,
+      InterruptedException, URISyntaxException {
+    // Create a table to write to
+    final String inputTable = "odpi_hcat_input_table_" + rand.nextInt(Integer.MAX_VALUE);
+    SerDeInfo serde = new SerDeInfo("default_serde",
+        conf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE), new HashMap<String, String>());
+    FieldSchema schema = new FieldSchema("line", "string", "");
+    inputSchema = new HCatSchema(Collections.singletonList(new HCatFieldSchema(schema.getName(),
+        HCatFieldSchema.Type.STRING, schema.getComment())));
+    StorageDescriptor sd = new StorageDescriptor(Collections.singletonList(schema), null,
+        "org.apache.hadoop.mapred.TextInputFormat",
+        "", false, 0, serde, null, null,
+        new HashMap<String, String>());
+    Table table = new Table(inputTable, "default", "me", 0, 0, 0, sd, null,
+        new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString());
+    client.createTable(table);
+    final String outputTable = "odpi_hcat_output_table_" + rand.nextInt(Integer.MAX_VALUE);
+    sd = new StorageDescriptor(Arrays.asList(
+          new FieldSchema("word", "string", ""),
+          new FieldSchema("count", "int", "")),
+        null, "org.apache.hadoop.mapred.TextInputFormat",
+        "", false, 0, serde, null, null,
+        new HashMap<String, String>());
+    table = new Table(outputTable, "default", "me", 0, 0, 0, sd, null,
+        new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString());
+    client.createTable(table);
+    outputSchema = new HCatSchema(Arrays.asList(
+        new HCatFieldSchema("word", HCatFieldSchema.Type.STRING, ""),
+        new HCatFieldSchema("count", HCatFieldSchema.Type.INT, "")));
+    // TODO Could I use HCatWriter hear and the reader to read it?
+    // Write some stuff into a file in the location of the table
+    table = client.getTable("default", inputTable);
+    String inputFile = table.getSd().getLocation() + "/input";
+    /*
+    String inputFile = JdbcConnector.getProperty(JdbcConnector.LOCATION,
+        "Directory to write a file in ") + "/odpi_hcat_input_" + rand.nextInt(Integer.MAX_VALUE);
+        */
+    Path inputPath = new Path(inputFile);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(inputPath);
+    out.writeChars("Mary had a little lamb\n");
+    out.writeChars("its fleece was white as snow\n");
+    out.writeChars("and everywhere that Mary went\n");
+    out.writeChars("the lamb was sure to go\n");
+    out.close();
+    Map<String, String> results = HiveHelper.execCommand(new CommandLine("hadoop")
+        .addArgument("jar")
+        .addArgument("/Users/gates/git/bigtop/runtime-1.2.0-SNAPSHOT.jar")
+        .addArgument(HCatalogMR.class.getName())
+        .addArgument(inputTable)
+        .addArgument(outputTable)
+        .addArgument(inputSchema.getSchemaAsTypeString())
+        .addArgument(outputSchema.getSchemaAsTypeString()));
+    Assert.assertEquals("HCat job failed", 0, Integer.parseInt(results.get("exitValue")));
+    /*
+    Job job = new Job(conf, "odpi_hcat_test");
+    HCatInputFormat.setInput(job, "default", inputTable);
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setJarByClass(TestHCatalog.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputKeyClass(WritableComparable.class);
+    job.setOutputValueClass(HCatRecord.class);
+    HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null));
+    HCatOutputFormat.setSchema(job, outputSchema);
+    job.setOutputFormatClass(HCatOutputFormat.class);
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar"));
+    Assert.assertTrue(job.waitForCompletion(true));
+    */
+    client.dropTable("default", inputTable);
+    client.dropTable("default", outputTable);
+  }
+  /*
+  public static class Map extends Mapper<WritableComparable,
+        HCatRecord, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    @Override
+    protected void map(WritableComparable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      String line = value.getString("line", inputSchema);
+      StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+  public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
+        IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable i : values) {
+        sum += i.get();
+      }
+      HCatRecord output = new DefaultHCatRecord(2);
+      output.set("word", outputSchema, key);
+      output.set("count", outputSchema, sum);
+      context.write(null, output);
+    }
+  }
+  */
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
index 5eaab95..8e0abda 100644
--- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/
@@ -45,7 +45,7 @@ import java.util.Random;
 public class TestThrift {
-  private static final Log LOG = LogFactory.getLog(JdbcConnector.class.getName());
+  private static final Log LOG = LogFactory.getLog(TestThrift.class.getName());
   private static IMetaStoreClient client = null;
   private static HiveConf conf;