You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bigtop.apache.org by rv...@apache.org on 2017/02/24 19:52:43 UTC

[40/50] [abbrv] bigtop git commit: Progress so far. Doesn't work yet, but committing to avoid another data loss.

Progress so far.  Doesn't work yet, but committing to avoid another data loss.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/27ba88b8
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/27ba88b8
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/27ba88b8

Branch: refs/heads/BIGTOP-2666
Commit: 27ba88b8238a5a482fb77e7d09d7902ea07314e6
Parents: 1965443
Author: Alan Gates <ga...@hortonworks.com>
Authored: Mon Nov 7 15:39:54 2016 -0800
Committer: Roman Shaposhnik <rv...@apache.org>
Committed: Fri Feb 24 11:51:23 2017 -0800

----------------------------------------------------------------------
 bigtop-tests/spec-tests/runtime/build.gradle    |   8 +-
 .../org/odpi/specs/runtime/hive/HCatalogMR.java | 124 ++++++++++
 .../odpi/specs/runtime/hive/JdbcConnector.java  |   3 +
 .../odpi/specs/runtime/hive/TestHCatalog.java   | 224 +++++++++++++++++++
 .../org/odpi/specs/runtime/hive/TestThrift.java |   2 +-
 5 files changed, 359 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/27ba88b8/bigtop-tests/spec-tests/runtime/build.gradle
----------------------------------------------------------------------
diff --git a/bigtop-tests/spec-tests/runtime/build.gradle b/bigtop-tests/spec-tests/runtime/build.gradle
index 5505550..f0166c9 100644
--- a/bigtop-tests/spec-tests/runtime/build.gradle
+++ b/bigtop-tests/spec-tests/runtime/build.gradle
@@ -17,6 +17,8 @@
  */
 def junitVersion = '4.11'
 
+apply plugin: 'java'
+
 repositories {
   maven {
     url "http://conjars.org/repo/"
@@ -31,8 +33,12 @@ dependencies {
   compile group: 'org.apache.hive', name: 'hive-common', version: '1.2.1'
   compile group: 'org.apache.thrift', name: 'libfb303', version: '0.9.3'
   compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
-  testCompile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2'
+  compile group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.2'
+  compile group: 'org.apache.hive.hcatalog', name: 'hive-hcatalog-core', version: '1.2.1'
   testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-core', version: '2.7.2'
+  compile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-jobclient', version: '2.7.2'
+  testCompile group: 'org.apache.hadoop', name: 'hadoop-mapreduce-client-common', version: '2.7.2'
+  testCompile group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.2'
   testCompile group: 'org.apache.hive', name: 'hive-exec', version: '1.2.1'
   if (System.env.HADOOP_CONF_DIR) testRuntime files(System.env.HADOOP_CONF_DIR)
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/27ba88b8/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java
----------------------------------------------------------------------
diff --git a/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java
new file mode 100644
index 0000000..4a733d6
--- /dev/null
+++ b/bigtop-tests/spec-tests/runtime/src/main/java/org/odpi/specs/runtime/hive/HCatalogMR.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.odpi.specs.runtime.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Tool;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.StringTokenizer;
+
+public class HCatalogMR extends Configured implements Tool {
+  private final static String INPUT_SCHEMA = "odpi.test.hcat.schema.input";
+  private final static String OUTPUT_SCHEMA = "odpi.test.hcat.schema.output";
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    args = new GenericOptionsParser(conf, args).getRemainingArgs();
+
+    String inputTable = args[0];
+    String outputTable = args[1];
+    String inputSchemaStr = args[2];
+    String outputSchemaStr = args[3];
+
+    conf.set(INPUT_SCHEMA, inputSchemaStr);
+    conf.set(OUTPUT_SCHEMA, outputSchemaStr);
+
+    Job job = new Job(conf, "odpi_hcat_test");
+    HCatInputFormat.setInput(job, "default", inputTable);
+
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setJarByClass(HCatalogMR.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputKeyClass(WritableComparable.class);
+    job.setOutputValueClass(HCatRecord.class);
+    HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null));
+    HCatOutputFormat.setSchema(job, HCatSchemaUtils.getHCatSchema(outputSchemaStr));
+    job.setOutputFormatClass(HCatOutputFormat.class);
+
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar"));
+
+    return job.waitForCompletion(true) ? 0 : 1;
+
+
+  }
+  public static class Map extends Mapper<WritableComparable,
+          HCatRecord, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+    private HCatSchema inputSchema = null;
+
+    @Override
+    protected void map(WritableComparable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      if (inputSchema == null) {
+        inputSchema =
+            HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(INPUT_SCHEMA));
+      }
+      String line = value.getString("line", inputSchema);
+      StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
+    private HCatSchema outputSchema = null;
+
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
+        IOException, InterruptedException {
+      if (outputSchema == null) {
+        outputSchema =
+            HCatSchemaUtils.getHCatSchema(context.getConfiguration().get(OUTPUT_SCHEMA));
+      }
+      int sum = 0;
+      for (IntWritable i : values) {
+        sum += i.get();
+      }
+      HCatRecord output = new DefaultHCatRecord(2);
+      output.set("word", outputSchema, key);
+      output.set("count", outputSchema, sum);
+      context.write(null, output);
+    }
+  }
+ }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/27ba88b8/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java
----------------------------------------------------------------------
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java
index f5cc379..7512dab 100644
--- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/JdbcConnector.java
@@ -36,6 +36,9 @@ public class JdbcConnector {
   protected static final String LOCATION = "odpi.test.hive.location";
   protected static final String METASTORE_URL = "odpi.test.hive.metastore.url";
   protected static final String TEST_THRIFT = "odpi.test.hive.thrift.test";
+  protected static final String TEST_HCATALOG = "odpi.test.hive.hcatalog.test";
+  protected static final String HIVE_CONF_DIR = "odpi.test.hive.conf.dir";
+  protected static final String HADOOP_CONF_DIR = "odpi.test.hadoop.conf.dir";
 
   protected static Connection conn;
 

http://git-wip-us.apache.org/repos/asf/bigtop/blob/27ba88b8/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java
----------------------------------------------------------------------
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java
new file mode 100644
index 0000000..4b61131
--- /dev/null
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestHCatalog.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.odpi.specs.runtime.hive;
+
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.hive.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.thrift.TException;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+
+public class TestHCatalog {
+
+  private static final Log LOG = LogFactory.getLog(TestHCatalog.class.getName());
+
+  private static IMetaStoreClient client = null;
+  private static HiveConf conf;
+  private static HCatSchema inputSchema;
+  private static HCatSchema outputSchema;
+
+  private Random rand;
+
+  @BeforeClass
+  public static void connect() throws MetaException {
+    if (JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog ")) {
+      String hiveConfDir = JdbcConnector.getProperty(JdbcConnector.HIVE_CONF_DIR,
+          "Hive conf directory ");
+      String hadoopConfDir = JdbcConnector.getProperty(JdbcConnector.HADOOP_CONF_DIR,
+          "Hadoop conf directory ");
+      conf = new HiveConf();
+      String fileSep = System.getProperty("file.separator");
+      conf.addResource(new Path(hadoopConfDir + fileSep + "core-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "hdfs-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "yarn-site.xml"));
+      conf.addResource(new Path(hadoopConfDir + fileSep + "mapred-site.xml"));
+      conf.addResource(new Path(hiveConfDir + fileSep + "hive-site.xml"));
+      client = new HiveMetaStoreClient(conf);
+
+    }
+  }
+
+  @Before
+  public void checkIfActive() {
+    Assume.assumeTrue(JdbcConnector.testActive(JdbcConnector.TEST_HCATALOG, "Test HCatalog "));
+    rand = new Random();
+  }
+
+  @Test
+  public void hcatInputFormatOutputFormat() throws TException, IOException, ClassNotFoundException,
+      InterruptedException, URISyntaxException {
+    // Create a table to write to
+    final String inputTable = "odpi_hcat_input_table_" + rand.nextInt(Integer.MAX_VALUE);
+    SerDeInfo serde = new SerDeInfo("default_serde",
+        conf.getVar(HiveConf.ConfVars.HIVEDEFAULTSERDE), new HashMap<String, String>());
+    FieldSchema schema = new FieldSchema("line", "string", "");
+    inputSchema = new HCatSchema(Collections.singletonList(new HCatFieldSchema(schema.getName(),
+        HCatFieldSchema.Type.STRING, schema.getComment())));
+    StorageDescriptor sd = new StorageDescriptor(Collections.singletonList(schema), null,
+        "org.apache.hadoop.mapred.TextInputFormat",
+        "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat", false, 0, serde, null, null,
+        new HashMap<String, String>());
+    Table table = new Table(inputTable, "default", "me", 0, 0, 0, sd, null,
+        new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString());
+    client.createTable(table);
+
+    final String outputTable = "odpi_hcat_output_table_" + rand.nextInt(Integer.MAX_VALUE);
+    sd = new StorageDescriptor(Arrays.asList(
+          new FieldSchema("word", "string", ""),
+          new FieldSchema("count", "int", "")),
+        null, "org.apache.hadoop.mapred.TextInputFormat",
+        "org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat", false, 0, serde, null, null,
+        new HashMap<String, String>());
+    table = new Table(outputTable, "default", "me", 0, 0, 0, sd, null,
+        new HashMap<String, String>(), null, null, TableType.MANAGED_TABLE.toString());
+    client.createTable(table);
+    outputSchema = new HCatSchema(Arrays.asList(
+        new HCatFieldSchema("word", HCatFieldSchema.Type.STRING, ""),
+        new HCatFieldSchema("count", HCatFieldSchema.Type.INT, "")));
+
+    // TODO Could I use HCatWriter hear and the reader to read it?
+    // Write some stuff into a file in the location of the table
+    table = client.getTable("default", inputTable);
+    String inputFile = table.getSd().getLocation() + "/input";
+    /*
+    String inputFile = JdbcConnector.getProperty(JdbcConnector.LOCATION,
+        "Directory to write a file in ") + "/odpi_hcat_input_" + rand.nextInt(Integer.MAX_VALUE);
+        */
+    Path inputPath = new Path(inputFile);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(inputPath);
+    out.writeChars("Mary had a little lamb\n");
+    out.writeChars("its fleece was white as snow\n");
+    out.writeChars("and everywhere that Mary went\n");
+    out.writeChars("the lamb was sure to go\n");
+    out.close();
+
+    Map<String, String> results = HiveHelper.execCommand(new CommandLine("hadoop")
+        .addArgument("jar")
+        .addArgument("/Users/gates/git/bigtop/runtime-1.2.0-SNAPSHOT.jar")
+        .addArgument(HCatalogMR.class.getName())
+        .addArgument(inputTable)
+        .addArgument(outputTable)
+        .addArgument(inputSchema.getSchemaAsTypeString())
+        .addArgument(outputSchema.getSchemaAsTypeString()));
+    Assert.assertEquals("HCat job failed", 0, Integer.parseInt(results.get("exitValue")));
+
+
+
+    /*
+    Job job = new Job(conf, "odpi_hcat_test");
+    HCatInputFormat.setInput(job, "default", inputTable);
+
+    job.setInputFormatClass(HCatInputFormat.class);
+    job.setJarByClass(TestHCatalog.class);
+    job.setMapperClass(Map.class);
+    job.setReducerClass(Reduce.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(IntWritable.class);
+    job.setOutputKeyClass(WritableComparable.class);
+    job.setOutputValueClass(HCatRecord.class);
+    HCatOutputFormat.setOutput(job, OutputJobInfo.create("default", outputTable, null));
+    HCatOutputFormat.setSchema(job, outputSchema);
+    job.setOutputFormatClass(HCatOutputFormat.class);
+
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-hcatalog-core-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-metastore-1.2.1.jar"));
+    job.addCacheArchive(new URI("hdfs:/user/gates/hive-exec-1.2.1.jar"));
+
+    Assert.assertTrue(job.waitForCompletion(true));
+    */
+
+    client.dropTable("default", inputTable);
+    client.dropTable("default", outputTable);
+  }
+
+  /*
+  public static class Map extends Mapper<WritableComparable,
+        HCatRecord, Text, IntWritable> {
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    @Override
+    protected void map(WritableComparable key, HCatRecord value, Context context)
+        throws IOException, InterruptedException {
+      String line = value.getString("line", inputSchema);
+      StringTokenizer tokenizer = new StringTokenizer(line);
+      while (tokenizer.hasMoreTokens()) {
+        word.set(tokenizer.nextToken());
+        context.write(word, one);
+      }
+    }
+  }
+
+  public static class Reduce extends Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
+    @Override
+    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws
+        IOException, InterruptedException {
+      int sum = 0;
+      for (IntWritable i : values) {
+        sum += i.get();
+      }
+      HCatRecord output = new DefaultHCatRecord(2);
+      output.set("word", outputSchema, key);
+      output.set("count", outputSchema, sum);
+      context.write(null, output);
+    }
+  }
+  */
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/27ba88b8/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java
----------------------------------------------------------------------
diff --git a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java
index 5eaab95..8e0abda 100644
--- a/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java
+++ b/bigtop-tests/spec-tests/runtime/src/test/java/org/odpi/specs/runtime/hive/TestThrift.java
@@ -45,7 +45,7 @@ import java.util.Random;
 
 public class TestThrift {
 
-  private static final Log LOG = LogFactory.getLog(JdbcConnector.class.getName());
+  private static final Log LOG = LogFactory.getLog(TestThrift.class.getName());
 
   private static IMetaStoreClient client = null;
   private static HiveConf conf;