You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/01/28 08:20:32 UTC

[01/11] hive git commit: HIVE-12045: ClassNotFoundException for GenericUDF [Spark Branch] (Rui via Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master 8c65c3455 -> 277825fe8


HIVE-12045: ClassNotFoundException for GenericUDF [Spark Branch] (Rui via Xuefu)

Conflicts:
	ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cd48911
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cd48911
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cd48911

Branch: refs/heads/master
Commit: 5cd4891192e63075b9c0ed2425294a45b3ca8688
Parents: 8c65c34
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Fri Nov 20 14:16:21 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:20:35 2016 +0800

----------------------------------------------------------------------
 .../genericudf/example/GenericUDFAdd10.java     | 151 +++++++++++++++++++
 data/conf/spark/yarn-client/hive-site.xml       |   7 +-
 .../test/resources/testconfiguration.properties |   1 +
 .../ql/exec/spark/HiveSparkClientFactory.java   |   5 +
 .../ql/exec/spark/RemoteHiveSparkClient.java    |   5 +-
 .../hive/ql/exec/spark/SparkUtilities.java      |   3 +-
 .../ql/exec/spark/session/SparkSession.java     |   8 +
 .../ql/exec/spark/session/SparkSessionImpl.java |  39 +++++
 .../clientpositive/gen_udf_example_add10.q      |  13 ++
 .../clientpositive/gen_udf_example_add10.q.out  |  95 ++++++++++++
 .../spark/gen_udf_example_add10.q.out           | 101 +++++++++++++
 .../hive/spark/client/SparkClientUtilities.java |   9 +-
 12 files changed, 431 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
----------------------------------------------------------------------
diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
new file mode 100644
index 0000000..b87de09
--- /dev/null
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFAbs.
+ *
+ */
+@Description(name = "add10",
+    value = "_FUNC_(x) - returns 10 plus the original value of x",
+    extended = "Example:\n"
+        + "  > SELECT _FUNC_(0) FROM src LIMIT 1;\n"
+        + "  10\n"
+        + "  > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + "  5")
+public class GenericUDFAdd10 extends GenericUDF {
+  private transient PrimitiveCategory inputType;
+  private final DoubleWritable resultDouble = new DoubleWritable();
+  private final LongWritable resultLong = new LongWritable();
+  private final IntWritable resultInt = new IntWritable();
+  private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
+  private transient PrimitiveObjectInspector argumentOI;
+  private transient Converter inputConverter;
+
+  @Override
+  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+    if (arguments.length != 1) {
+      throw new UDFArgumentLengthException(
+          "ADD10() requires 1 argument, got " + arguments.length);
+    }
+
+    if (arguments[0].getCategory() != Category.PRIMITIVE) {
+      throw new UDFArgumentException(
+          "ADD10 only takes primitive types, got " + arguments[0].getTypeName());
+    }
+    argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+    inputType = argumentOI.getPrimitiveCategory();
+    ObjectInspector outputOI = null;
+    switch (inputType) {
+    case SHORT:
+    case BYTE:
+    case INT:
+      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+          PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+      outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+      break;
+    case LONG:
+      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+          PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+      outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+      break;
+    case FLOAT:
+    case STRING:
+    case DOUBLE:
+      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+          PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+      outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+      break;
+    case DECIMAL:
+      outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+          ((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
+      inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+          outputOI);
+      break;
+    default:
+      throw new UDFArgumentException(
+          "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+    }
+    return outputOI;
+  }
+
+  @Override
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+    Object valObject = arguments[0].get();
+    if (valObject == null) {
+      return null;
+    }
+    switch (inputType) {
+    case SHORT:
+    case BYTE:
+    case INT:
+      valObject = inputConverter.convert(valObject);
+      resultInt.set(10 + ((IntWritable) valObject).get());
+      return resultInt;
+    case LONG:
+      valObject = inputConverter.convert(valObject);
+      resultLong.set(10 + ((LongWritable) valObject).get());
+      return resultLong;
+    case FLOAT:
+    case STRING:
+    case DOUBLE:
+      valObject = inputConverter.convert(valObject);
+      resultDouble.set(10.0 + ((DoubleWritable) valObject).get());
+      return resultDouble;
+    case DECIMAL:
+      HiveDecimalObjectInspector decimalOI =
+          (HiveDecimalObjectInspector) argumentOI;
+      HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);
+
+      if (val != null) {
+        resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10")));
+        val = resultDecimal;
+      }
+      return val;
+    default:
+      throw new UDFArgumentException(
+          "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+    }
+  }
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return getStandardDisplayString("add10", children);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index aef7877..bc42f0d 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -196,7 +196,7 @@
 
 <property>
   <name>spark.master</name>
-  <value>yarn-client</value>
+  <value>yarn-cluster</value>
 </property>
 
 <property>
@@ -255,4 +255,9 @@
   <description>Internal marker for test. Used for masking env-dependent values</description>
 </property>
 
+<property>
+  <name>hive.spark.client.connect.timeout</name>
+  <value>30000ms</value>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 8318c3a..4df7d25 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1262,6 +1262,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   empty_dir_in_table.q,\
   external_table_with_space_in_location_path.q,\
   file_with_header_footer.q,\
+  gen_udf_example_add10.q,\
   import_exported_table.q,\
   index_bitmap3.q,\
   index_bitmap_auto.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 259c12f..d215873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -133,6 +133,11 @@ public class HiveSparkClientFactory {
         LOG.info(String.format(
           "load yarn property from hive configuration in %s mode (%s -> %s).",
           sparkMaster, propertyName, value));
+      } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) {
+        String value = hiveConf.get(propertyName);
+        if (value != null && !value.isEmpty()) {
+          sparkConf.put("spark.hadoop." + propertyName, value);
+        }
       }
       if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
         String value = RpcConfiguration.getValue(hiveConf, propertyName);

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 6380774..11e7116 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -297,8 +297,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
       // may need to load classes from this jar in other threads.
       Map<String, Long> addedJars = jc.getAddedJars();
       if (addedJars != null && !addedJars.isEmpty()) {
-        SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
-        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
+        List<String> localAddedJars = SparkClientUtilities.addToClassPath(addedJars,
+            localJobConf, jc.getLocalTmpDir());
+        localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";"));
       }
 
       Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index a61cdc5..5a6bef9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -89,7 +89,8 @@ public class SparkUtilities {
    */
   public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
     Path localFile = new Path(source.getPath());
-    Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
+    Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(),
+        getFileName(source));
     FileSystem fileSystem = FileSystem.get(conf);
     // Overwrite if the remote file already exists. Whether the file can be added
     // on executor is up to spark, i.e. spark.files.overwrite

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index 3d4b39b..e1f8c1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark.session;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -24,6 +25,8 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
+import java.io.IOException;
+
 public interface SparkSession {
   /**
    * Initializes a Spark session for DAG execution.
@@ -67,4 +70,9 @@ public interface SparkSession {
    * Close session and release resources.
    */
   void close();
+
+  /**
+   * Get an HDFS dir specific to the SparkSession
+   * */
+  Path getHDFSSessionDir() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index f04e145..51c6715 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
 import java.io.IOException;
 import java.util.UUID;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ObjectPair;
@@ -37,11 +41,14 @@ import com.google.common.base.Preconditions;
 
 public class SparkSessionImpl implements SparkSession {
   private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
+  private static final String SPARK_DIR = "_spark_session_dir";
 
   private HiveConf conf;
   private boolean isOpen;
   private final String sessionId;
   private HiveSparkClient hiveSparkClient;
+  private Path scratchDir;
+  private final Object dirLock = new Object();
 
   public SparkSessionImpl() {
     sessionId = makeSessionId();
@@ -118,6 +125,7 @@ public class SparkSessionImpl implements SparkSession {
     if (hiveSparkClient != null) {
       try {
         hiveSparkClient.close();
+        cleanScratchDir();
       } catch (IOException e) {
         LOG.error("Failed to close spark session (" + sessionId + ").", e);
       }
@@ -125,6 +133,37 @@ public class SparkSessionImpl implements SparkSession {
     hiveSparkClient = null;
   }
 
+  private Path createScratchDir() throws IOException {
+    Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR);
+    Path sparkDir = new Path(parent, sessionId);
+    FileSystem fs = sparkDir.getFileSystem(conf);
+    FsPermission fsPermission = new FsPermission(HiveConf.getVar(
+        conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
+    fs.mkdirs(sparkDir, fsPermission);
+    fs.deleteOnExit(sparkDir);
+    return sparkDir;
+  }
+
+  private void cleanScratchDir() throws IOException {
+    if (scratchDir != null) {
+      FileSystem fs = scratchDir.getFileSystem(conf);
+      fs.delete(scratchDir, true);
+      scratchDir = null;
+    }
+  }
+
+  @Override
+  public Path getHDFSSessionDir() throws IOException {
+    if (scratchDir == null) {
+      synchronized (dirLock) {
+        if (scratchDir == null) {
+          scratchDir = createScratchDir();
+        }
+      }
+    }
+    return scratchDir;
+  }
+
   public static String makeSessionId() {
     return UUID.randomUUID().toString();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/gen_udf_example_add10.q b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
new file mode 100644
index 0000000..69178c9
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
@@ -0,0 +1,13 @@
+add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;
+
+create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10';
+
+create table t1(x int,y double);
+load data local inpath '../../data/files/T1.txt' into table t1;
+
+explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+drop table t1;
+drop temporary function example_add10;

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..984554d
--- /dev/null
+++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
@@ -0,0 +1,95 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: t1
+            Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: add10(x) (type: int), add10(y) (type: double)
+              outputColumnNames: _col0, _col1
+              Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              Reduce Output Operator
+                key expressions: _col0 (type: int), _col1 (type: double)
+                sort order: -+
+                Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+      Reduce Operator Tree:
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+          Limit
+            Number of rows: 10
+            Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+              table:
+                  input format: org.apache.hadoop.mapred.TextInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18	28.0
+18	38.0
+17	27.0
+13	23.0
+12	22.0
+11	21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..05ec1f5
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Spark
+      Edges:
+        Reducer 2 <- Map 1 (SORT, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: t1
+                  Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: add10(x) (type: int), add10(y) (type: double)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: _col0 (type: int), _col1 (type: double)
+                      sort order: -+
+                      Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+        Reducer 2 
+            Reduce Operator Tree:
+              Select Operator
+                expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                Limit
+                  Number of rows: 10
+                  Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: 10
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18	28.0
+18	38.0
+17	27.0
+13	23.0
+12	22.0
+11	21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10

http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index b779f3f..6251861 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,15 +44,18 @@ public class SparkClientUtilities {
    * Add new elements to the classpath.
    *
    * @param newPaths Map of classpath elements and corresponding timestamp
+   * @return locally accessible files corresponding to the newPaths
    */
-  public static void addToClassPath(Map<String, Long> newPaths, Configuration conf, File localTmpDir)
-      throws Exception {
+  public static List<String> addToClassPath(Map<String, Long> newPaths, Configuration conf,
+      File localTmpDir) throws Exception {
     URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
     List<URL> curPath = Lists.newArrayList(loader.getURLs());
+    List<String> localNewPaths = new ArrayList<>();
 
     boolean newPathAdded = false;
     for (Map.Entry<String, Long> entry : newPaths.entrySet()) {
       URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir);
+      localNewPaths.add(newUrl.toString());
       if (newUrl != null && !curPath.contains(newUrl)) {
         curPath.add(newUrl);
         LOG.info("Added jar[" + newUrl + "] to classpath.");
@@ -64,6 +68,7 @@ public class SparkClientUtilities {
           new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
       Thread.currentThread().setContextClassLoader(newLoader);
     }
+    return localNewPaths;
   }
 
   /**


[10/11] hive git commit: HIVE-9774: Print yarn application id to console [Spark Branch] (Rui reviewed by Xuefu)

Posted by li...@apache.org.
HIVE-9774: Print yarn application id to console [Spark Branch] (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b9f99591
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b9f99591
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b9f99591

Branch: refs/heads/master
Commit: b9f99591f6350d20902708c28bfbb708d7603c6e
Parents: e1a7503
Author: Rui Li <ru...@intel.com>
Authored: Tue Jan 19 09:51:29 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:53:06 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   | 22 +++++++++++++++-----
 .../spark/status/RemoteSparkJobMonitor.java     | 15 +++++++++++++
 .../ql/exec/spark/status/SparkJobStatus.java    |  2 ++
 .../spark/status/impl/LocalSparkJobStatus.java  |  5 +++++
 .../spark/status/impl/RemoteSparkJobStatus.java | 22 ++++++++++++++++++++
 5 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b9d4b5e..0ff0b39 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -387,6 +387,7 @@ public class HiveConf extends Configuration {
     // a symbolic name to reference in the Hive source code. Properties with non-null
     // values will override any values set in the underlying Hadoop configuration.
     HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true),
+    YARNBIN("yarn.bin.path", findYarnBinary(), "", true),
     HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem",
         "The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"),
     MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true),
@@ -2796,16 +2797,27 @@ public class HiveConf extends Configuration {
     }
 
     private static String findHadoopBinary() {
+      String val = findHadoopHome();
+      // if can't find hadoop home we can at least try /usr/bin/hadoop
+      val = (val == null ? File.separator + "usr" : val)
+          + File.separator + "bin" + File.separator + "hadoop";
+      // Launch hadoop command file on windows.
+      return val + (Shell.WINDOWS ? ".cmd" : "");
+    }
+
+    private static String findYarnBinary() {
+      String val = findHadoopHome();
+      val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn");
+      return val + (Shell.WINDOWS ? ".cmd" : "");
+    }
+
+    private static String findHadoopHome() {
       String val = System.getenv("HADOOP_HOME");
       // In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX
       if (val == null) {
         val = System.getenv("HADOOP_PREFIX");
       }
-      // and if all else fails we can at least try /usr/bin/hadoop
-      val = (val == null ? File.separator + "usr" : val)
-        + File.separator + "bin" + File.separator + "hadoop";
-      // Launch hadoop command file on windows.
-      return val + (Shell.WINDOWS ? ".cmd" : "");
+      return val;
     }
 
     public String getDefaultValue() {

http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index fb0498a..6990e80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,10 +34,12 @@ import org.apache.spark.JobExecutionStatus;
 public class RemoteSparkJobMonitor extends SparkJobMonitor {
 
   private RemoteSparkJobStatus sparkJobStatus;
+  private final HiveConf hiveConf;
 
   public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) {
     super(hiveConf);
     this.sparkJobStatus = sparkJobStatus;
+    this.hiveConf = hiveConf;
   }
 
   @Override
@@ -77,6 +79,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
             Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
             if (!running) {
               perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+              printAppInfo();
               // print job stages.
               console.printInfo("\nQuery Hive on Spark job["
                 + sparkJobStatus.getJobId() + "] stages:");
@@ -137,4 +140,16 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
     return rc;
   }
+
+  private void printAppInfo() {
+    String sparkMaster = hiveConf.get("spark.master");
+    if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
+      String appID = sparkJobStatus.getAppID();
+      if (appID != null) {
+        console.printInfo("Running with YARN Application = " + appID);
+        console.printInfo("Kill Command = " +
+            HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index fa45ec8..7959089 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -29,6 +29,8 @@ import java.util.Map;
  */
 public interface SparkJobStatus {
 
+  String getAppID();
+
   int getJobId();
 
   JobExecutionStatus getState() throws HiveException;

http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index ebc5c16..3c15521 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -66,6 +66,11 @@ public class LocalSparkJobStatus implements SparkJobStatus {
   }
 
   @Override
+  public String getAppID() {
+    return sparkContext.sc().applicationId();
+  }
+
+  @Override
   public int getJobId() {
     return jobId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index e8d581f..d84c026 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -62,6 +62,17 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
   }
 
   @Override
+  public String getAppID() {
+    Future<String> getAppID = sparkClient.run(new GetAppIDJob());
+    try {
+      return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.warn("Failed to get APP ID.", e);
+      return null;
+    }
+  }
+
+  @Override
   public int getJobId() {
     return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1;
   }
@@ -268,4 +279,15 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
       }
     };
   }
+
+  private static class GetAppIDJob implements Job<String> {
+
+    public GetAppIDJob() {
+    }
+
+    @Override
+    public String call(JobContext jc) throws Exception {
+      return jc.sc().sc().applicationId();
+    }
+  }
 }


[11/11] hive git commit: HIVE-12888: TestSparkNegativeCliDriver does not run in Spark mode[Spark Branch] (Chengxiang via Xuefu)

Posted by li...@apache.org.
HIVE-12888: TestSparkNegativeCliDriver does not run in Spark mode[Spark Branch] (Chengxiang via Xuefu)

Conflicts:
	ql/src/test/templates/TestNegativeCliDriver.vm


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/277825fe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/277825fe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/277825fe

Branch: refs/heads/master
Commit: 277825fe8ea706ffbbb0d35d484097bc9e01a70f
Parents: b9f9959
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Tue Jan 26 19:31:49 2016 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:55:20 2016 +0800

----------------------------------------------------------------------
 ql/src/test/templates/TestNegativeCliDriver.vm | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/277825fe/ql/src/test/templates/TestNegativeCliDriver.vm
----------------------------------------------------------------------
diff --git a/ql/src/test/templates/TestNegativeCliDriver.vm b/ql/src/test/templates/TestNegativeCliDriver.vm
index d1cbbfd..592d64f 100644
--- a/ql/src/test/templates/TestNegativeCliDriver.vm
+++ b/ql/src/test/templates/TestNegativeCliDriver.vm
@@ -34,13 +34,17 @@ public class $className {
 
   static {
     MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+    String hiveConfDir = "$hiveConfDir";
     String initScript = "$initScript";
     String cleanupScript = "$cleanupScript";
 
     try {
       String hadoopVer = "$hadoopVersion";
-      qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, null, hadoopVer,
-       initScript, cleanupScript, false, false);
+      if (!hiveConfDir.isEmpty()) {
+        hiveConfDir = HIVE_ROOT + hiveConfDir;
+      }
+      qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR,
+       hiveConfDir, hadoopVer, initScript, cleanupScript, false, false);
       // do a one time initialization
       qt.cleanUp();
       qt.createSources();


[05/11] hive git commit: HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)

Posted by li...@apache.org.
HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f57569b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f57569b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f57569b

Branch: refs/heads/master
Commit: 9f57569b0f648bb5596df60e0a62db06930778ea
Parents: 72e070f
Author: xzhang <xz...@xzdt>
Authored: Mon Dec 7 11:10:25 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:50:07 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/ServerUtils.java  | 19 +++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 ++
 .../service/cli/thrift/ThriftCLIService.java    | 15 +++---
 .../hive/spark/client/rpc/RpcConfiguration.java | 57 +++++++-------------
 4 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index 83517ce..b44f92f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hive.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,4 +50,20 @@ public class ServerUtils {
     }
   }
 
+  /**
+   * Get the Inet address of the machine of the given host name.
+   * @param hostname The name of the host
+   * @return The network address of the the host
+   * @throws UnknownHostException
+   */
+  public static InetAddress getHostAddress(String hostname) throws UnknownHostException {
+    InetAddress serverIPAddress;
+    if (hostname != null && !hostname.isEmpty()) {
+      serverIPAddress = InetAddress.getByName(hostname);
+    } else {
+      serverIPAddress = InetAddress.getLocalHost();
+    }
+    return serverIPAddress;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e10fd4..c4034a5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2601,6 +2601,11 @@ public class HiveConf extends Configuration {
       "Channel logging level for remote Spark driver.  One of {DEBUG, ERROR, INFO, TRACE, WARN}."),
     SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
       "Name of the SASL mechanism to use for authentication."),
+    SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
+      "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + 
+      "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
+      "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
+      "hive.server2.thrift.bind.host is to be used."),
     SPARK_DYNAMIC_PARTITION_PRUNING(
         "hive.spark.dynamic.partition.pruning", false,
         "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 78b4b31..5c8fa3c 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hive.service.AbstractService;
 import org.apache.hive.service.ServiceException;
 import org.apache.hive.service.ServiceUtils;
@@ -203,21 +204,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
   @Override
   public synchronized void init(HiveConf hiveConf) {
     this.hiveConf = hiveConf;
-    // Initialize common server configs needed in both binary & http modes
-    String portString;
-    hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+
+    String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
     if (hiveHost == null) {
       hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
     }
     try {
-      if (hiveHost != null && !hiveHost.isEmpty()) {
-        serverIPAddress = InetAddress.getByName(hiveHost);
-      } else {
-        serverIPAddress = InetAddress.getLocalHost();
-      }
+      serverIPAddress = ServerUtils.getHostAddress(hiveHost);
     } catch (UnknownHostException e) {
       throw new ServiceException(e);
     }
+
+    // Initialize common server configs needed in both binary & http modes
+    String portString;
     // HTTP mode
     if (HiveServer2.isHTTPTransportMode(hiveConf)) {
       workerKeepAliveTime =

http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 9c8cea0..e387659 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,20 +18,19 @@
 package org.apache.hive.spark.client.rpc;
 
 import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+
 import javax.security.sasl.Sasl;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -49,15 +48,14 @@ public final class RpcConfiguration {
     HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
     HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
     HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
-    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+    HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
+    HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
   );
   public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
     HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
     HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
   );
 
-  public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
-
   /** Prefix for other SASL options. */
   public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl.";
 
@@ -91,39 +89,22 @@ public final class RpcConfiguration {
     return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
   }
 
+  /**
+   * Here we assume that the remote driver will connect back to HS2 using the same network interface
+   * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that.
+   * For now, I think we are okay.
+   * @return server host name in the network
+   * @throws IOException
+   */
   String getServerAddress() throws IOException {
-    String value = config.get(SERVER_LISTEN_ADDRESS_KEY);
-    if (value != null) {
-      return value;
-    }
-
-    InetAddress address = InetAddress.getLocalHost();
-    if (address.isLoopbackAddress()) {
-      // Address resolves to something like 127.0.1.1, which happens on Debian;
-      // try to find a better address using the local network interfaces
-      Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
-      while (ifaces.hasMoreElements()) {
-        NetworkInterface ni = ifaces.nextElement();
-        Enumeration<InetAddress> addrs = ni.getInetAddresses();
-        while (addrs.hasMoreElements()) {
-          InetAddress addr = addrs.nextElement();
-          if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
-              && addr instanceof Inet4Address) {
-            // We've found an address that looks reasonable!
-            LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
-                + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
-                ni.getName());
-            LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
-            return addr.getHostAddress();
-          }
-        }
+    String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS);
+    if(StringUtils.isEmpty(hiveHost)) {
+      hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+      if (hiveHost == null) {
+        hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
       }
     }
-
-    LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
-        + " any external IP address!", address.getHostName());
-    LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
-    return address.getHostName();
+    return ServerUtils.getHostAddress(hiveHost).getHostName();
   }
 
   String getRpcChannelLogLevel() {


[07/11] hive git commit: HIVE-12811: Name yarn application name more meaning than just "Hive on Spark" (Rui reviewed by Xuefu)

Posted by li...@apache.org.
HIVE-12811: Name yarn application name more meaning than just "Hive on Spark" (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e47e1142
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e47e1142
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e47e1142

Branch: refs/heads/master
Commit: e47e11424f8867fb88266bda796f5639ccdd6d28
Parents: bb5ad57
Author: Rui Li <ru...@intel.com>
Authored: Wed Jan 13 10:41:27 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:51:15 2016 +0800

----------------------------------------------------------------------
 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java     | 3 ++-
 .../hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java     | 7 ++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e47e1142/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c4034a5..b9d4b5e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2925,7 +2925,8 @@ public class HiveConf extends Configuration {
   private boolean isSparkRelatedConfig(String name) {
     boolean result = false;
     if (name.startsWith("spark")) { // Spark property.
-      result = true;
+      // for now we don't support changing spark app name on the fly
+      result = !name.equals("spark.app.name");
     } else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
       String sparkMaster = get("spark.master");
       if (sparkMaster != null &&

http://git-wip-us.apache.org/repos/asf/hive/blob/e47e1142/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 9b2dce3..a832bf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -72,7 +72,12 @@ public class HiveSparkClientFactory {
 
     // set default spark configurations.
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
-    sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
+    final String appNameKey = "spark.app.name";
+    String appName = hiveConf.get(appNameKey);
+    if (appName == null) {
+      appName = SPARK_DEFAULT_APP_NAME;
+    }
+    sparkConf.put(appNameKey, appName);
     sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
     sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
 


[02/11] hive git commit: HIVE-12466: SparkCounter not initialized error (Rui via Chengxiang)

Posted by li...@apache.org.
HIVE-12466: SparkCounter not initialized error (Rui via Chengxiang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/120df071
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/120df071
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/120df071

Branch: refs/heads/master
Commit: 120df07186703dd2ecc930bbc5dfda191ad40773
Parents: 5cd4891
Author: chengxiang <ch...@apache.com>
Authored: Wed Nov 25 11:07:12 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:22:09 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java      | 17 ++++++++++-------
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java    | 14 +++++++++-----
 .../hadoop/hive/ql/exec/spark/SparkTask.java       |  4 ++--
 3 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 14121b6..0899793 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -441,13 +441,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       cntr = 1;
       logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
 
-      String suffix = Integer.toString(conf.getDestTableId());
-      String fullName = conf.getTableInfo().getTableName();
-      if (fullName != null) {
-        suffix = suffix + "_" + fullName.toLowerCase();
-      }
-
-      statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count);
+      statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count);
     } catch (HiveException e) {
       throw e;
     } catch (Exception e) {
@@ -456,6 +450,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
   }
 
+  public String getCounterName(Counter counter) {
+    String suffix = Integer.toString(conf.getDestTableId());
+    String fullName = conf.getTableInfo().getTableName();
+    if (fullName != null) {
+      suffix = suffix + "_" + fullName.toLowerCase();
+    }
+    return counter + "_" + suffix;
+  }
+
   private void logOutputFormatError(Configuration hconf, HiveException ex) {
     StringWriter errorWriter = new StringWriter();
     errorWriter.append("Failed to create output format; configuration: ");

http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 74b4802..e692460 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -170,11 +170,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       cntr = 1;
       logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
 
-      String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
-      if (context != null && !context.isEmpty()) {
-        context = "_" + context.replace(" ","_");
-      }
-      statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+      statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter);
 
       List<ExprNodeDesc> keys = conf.getKeyCols();
 
@@ -256,6 +252,14 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
     }
   }
 
+  public String getCounterName(Counter counter, Configuration hconf) {
+    String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+    if (context != null && !context.isEmpty()) {
+      context = "_" + context.replace(" ", "_");
+    }
+    return counter + context;
+  }
+
 
   /**
    * Initializes array of ExprNodeEvaluator. Adds Union field for distinct

http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index eb93aca..faa075a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -386,11 +386,11 @@ public class SparkTask extends Task<SparkWork> {
       for (Operator<? extends OperatorDesc> operator : work.getAllOperators()) {
         if (operator instanceof FileSinkOperator) {
           for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
-            hiveCounters.add(counter.toString());
+            hiveCounters.add(((FileSinkOperator) operator).getCounterName(counter));
           }
         } else if (operator instanceof ReduceSinkOperator) {
           for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
-            hiveCounters.add(counter.toString());
+            hiveCounters.add(((ReduceSinkOperator) operator).getCounterName(counter, conf));
           }
         } else if (operator instanceof ScriptOperator) {
           for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {


[09/11] hive git commit: HIVE-12611: Make sure spark.yarn.queue is effective and takes the value from mapreduce.job.queuename if given [Spark Branch] (Rui reviewed by Xuefu)

Posted by li...@apache.org.
HIVE-12611: Make sure spark.yarn.queue is effective and takes the value from mapreduce.job.queuename if given [Spark Branch] (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e1a7503b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e1a7503b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e1a7503b

Branch: refs/heads/master
Commit: e1a7503b81a5509e391940709c8f9a6f552646a7
Parents: 34b41e3
Author: Rui Li <ru...@intel.com>
Authored: Mon Jan 18 09:14:56 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:52:38 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java   | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e1a7503b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index a832bf6..993d02b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -174,6 +174,15 @@ public class HiveSparkClientFactory {
     classes.add(HiveKey.class.getName());
     sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
 
+    // set yarn queue name
+    final String sparkQueueNameKey = "spark.yarn.queue";
+    if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) {
+      String queueName = hiveConf.get("mapreduce.job.queuename");
+      if (queueName != null) {
+        sparkConf.put(sparkQueueNameKey, queueName);
+      }
+    }
+
     return sparkConf;
   }
 


[06/11] hive git commit: HIVE-12708: Hive on Spark doesn't work with Kerboresed HBase [Spark Branch] (reviewed by Szehon)

Posted by li...@apache.org.
HIVE-12708: Hive on Spark doesn't work with Kerboresed HBase [Spark Branch] (reviewed by Szehon)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb5ad573
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb5ad573
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb5ad573

Branch: refs/heads/master
Commit: bb5ad5733d0d5dfc2489deb658beea609405a29c
Parents: 9f57569
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Fri Dec 18 14:37:03 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:50:43 2016 +0800

----------------------------------------------------------------------
 .../hive/ql/exec/spark/HiveSparkClientFactory.java       | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bb5ad573/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index ec0fdea..9b2dce3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -30,6 +30,7 @@ import org.apache.commons.compress.utils.CharsetNames;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -67,6 +68,7 @@ public class HiveSparkClientFactory {
 
   public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
     Map<String, String> sparkConf = new HashMap<String, String>();
+    HBaseConfiguration.addHbaseResources(hiveConf);
 
     // set default spark configurations.
     sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
@@ -139,7 +141,16 @@ public class HiveSparkClientFactory {
         if (value != null && !value.isEmpty()) {
           sparkConf.put("spark.hadoop." + propertyName, value);
         }
+      } else if (propertyName.startsWith("hbase")) {
+        // Add HBase related configuration to Spark because in security mode, Spark needs it
+        // to generate hbase delegation token for Spark. This is a temp solution to deal with
+        // Spark problem.
+        String value = hiveConf.get(propertyName);
+        sparkConf.put("spark.hadoop." + propertyName, value);
+        LOG.info(String.format(
+          "load HBase configuration (%s -> %s).", propertyName, value));
       }
+
       if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
         String value = RpcConfiguration.getValue(hiveConf, propertyName);
         sparkConf.put(propertyName, value);


[08/11] hive git commit: HIVE-12828: Update Spark version to 1.6 (Rui reviewed by Xuefu)

Posted by li...@apache.org.
HIVE-12828: Update Spark version to 1.6 (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b41e38
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b41e38
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b41e38

Branch: refs/heads/master
Commit: 34b41e389ab331f1d61b077fbea8cb855775698f
Parents: e47e114
Author: Rui Li <ru...@intel.com>
Authored: Fri Jan 15 21:24:39 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:51:55 2016 +0800

----------------------------------------------------------------------
 data/conf/spark/yarn-client/hive-site.xml          | 17 ++++++++++++++++-
 pom.xml                                            |  2 +-
 .../apache/hadoop/hive/shims/Hadoop23Shims.java    |  2 ++
 3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index bc42f0d..f1d9ddc 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -231,7 +231,22 @@
 
 <property>
   <name>spark.executor.memory</name>
-  <value>512m</value>
+  <value>1g</value>
+</property>
+
+<property>
+  <name>spark.yarn.executor.memoryOverhead</name>
+  <value>0</value>
+</property>
+
+<property>
+  <name>spark.driver.memory</name>
+  <value>1g</value>
+</property>
+
+<property>
+  <name>spark.yarn.driver.memoryOverhead</name>
+  <value>0</value>
 </property>
 
 <property>

http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2066518..2d2a3de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
     <ST4.version>4.0.4</ST4.version>
     <tez.version>0.8.2</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
-    <spark.version>1.5.0</spark.version>
+    <spark.version>1.6.0</spark.version>
     <scala.binary.version>2.10</scala.binary.version>
     <scala.version>2.10.4</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index c03ae35..31060a2 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -526,6 +526,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
       mr = new MiniSparkOnYARNCluster("sparkOnYarn");
       conf.set("fs.defaultFS", nameNode);
       conf.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+      // disable resource monitoring, although it should be off by default
+      conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, false);
       configureImpersonation(conf);
       mr.init(conf);
       mr.start();


[04/11] hive git commit: HIVE-12515: Clean the SparkCounters related code after remove counter based stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)

Posted by li...@apache.org.
HIVE-12515: Clean the SparkCounters related code after remove counter based stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)

Conflicts:
	ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72e070f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72e070f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72e070f4

Branch: refs/heads/master
Commit: 72e070f432db3eadc9b6d538157714cf8f47fca1
Parents: 4a41ba5
Author: Rui Li <ru...@intel.com>
Authored: Thu Dec 3 16:37:05 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:49:12 2016 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |   4 -
 .../hadoop/hive/ql/exec/spark/SparkTask.java    | 146 +------------------
 2 files changed, 1 insertion(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4df7d25..ec6a2c7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \
   stats7.q, \
   stats8.q, \
   stats9.q, \
-  stats_counter.q, \
-  stats_counter_partitioned.q, \
   stats_noscan_1.q, \
   stats_noscan_2.q, \
   stats_only_null.q, \
@@ -1298,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
   schemeAuthority2.q,\
   scriptfile1.q,\
   scriptfile1_win.q,\
-  stats_counter.q,\
-  stats_counter_partitioned.q,\
   temp_table_external.q,\
   truncate_column_buckets.q,\
   uber_reduce.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index faa075a..26cce1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,11 +28,7 @@ import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -44,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -58,25 +51,15 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
@@ -86,7 +69,6 @@ public class SparkTask extends Task<SparkWork> {
   private static final LogHelper console = new LogHelper(LOG);
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final long serialVersionUID = 1L;
-  private SparkCounters sparkCounters;
 
   @Override
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext,
@@ -106,7 +88,7 @@ public class SparkTask extends Task<SparkWork> {
       sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
 
       SparkWork sparkWork = getWork();
-      sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+      sparkWork.setRequiredCounterPrefix(getOperatorCounters());
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -116,8 +98,6 @@ public class SparkTask extends Task<SparkWork> {
       rc = jobRef.monitorJob();
       SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
       if (rc == 0) {
-        sparkCounters = sparkJobStatus.getCounter();
-        // for RSC, we should get the counters after job has finished
         SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
         if (LOG.isInfoEnabled() && sparkStatistics != null) {
           LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
@@ -233,10 +213,6 @@ public class SparkTask extends Task<SparkWork> {
     return ((ReduceWork) children.get(0)).getReducer();
   }
 
-  public SparkCounters getSparkCounters() {
-    return sparkCounters;
-  }
-
   /**
    * Set the number of reducers for the spark work.
    */
@@ -250,126 +226,6 @@ public class SparkTask extends Task<SparkWork> {
     console.printInfo("  set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
   }
 
-  private Map<String, List<String>> getCounterPrefixes() throws HiveException, MetaException {
-    Map<String, List<String>> counters = getOperatorCounters();
-    StatsTask statsTask = getStatsTaskInChildTasks(this);
-    String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
-    // fetch table prefix if SparkTask try to gather table statistics based on counter.
-    if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
-      List<String> prefixes = getRequiredCounterPrefix(statsTask);
-      for (String prefix : prefixes) {
-        List<String> counterGroup = counters.get(prefix);
-        if (counterGroup == null) {
-          counterGroup = new LinkedList<String>();
-          counters.put(prefix, counterGroup);
-        }
-        counterGroup.add(StatsSetupConst.ROW_COUNT);
-        counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
-      }
-    }
-    return counters;
-  }
-
-  private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
-    List<String> prefixs = new LinkedList<String>();
-    StatsWork statsWork = statsTask.getWork();
-    String tablePrefix = getTablePrefix(statsWork);
-    List<Map<String, String>> partitionSpecs = getPartitionSpecs(statsWork);
-
-    if (partitionSpecs == null) {
-      prefixs.add(tablePrefix.endsWith(Path.SEPARATOR) ? tablePrefix : tablePrefix + Path.SEPARATOR);
-    } else {
-      for (Map<String, String> partitionSpec : partitionSpecs) {
-        String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partitionSpec));
-        prefixs.add(prefixWithPartition.endsWith(Path.SEPARATOR) ? prefixWithPartition : prefixWithPartition + Path.SEPARATOR);
-      }
-    }
-
-    return prefixs;
-  }
-
-  private String getTablePrefix(StatsWork work) throws HiveException {
-      String tableName;
-      if (work.getLoadTableDesc() != null) {
-        tableName = work.getLoadTableDesc().getTable().getTableName();
-      } else if (work.getTableSpecs() != null) {
-        tableName = work.getTableSpecs().tableName;
-      } else {
-        tableName = work.getLoadFileDesc().getDestinationCreateTable();
-      }
-    Table table;
-    try {
-      table = db.getTable(tableName);
-    } catch (HiveException e) {
-      LOG.warn("Failed to get table:" + tableName);
-      // For CTAS query, table does not exist in this period, just use table name as prefix.
-      return tableName.toLowerCase();
-    }
-    return table.getDbName() + "." + table.getTableName();
-  }
-
-  private static StatsTask getStatsTaskInChildTasks(Task<? extends Serializable> rootTask) {
-
-    List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
-    if (childTasks == null) {
-      return null;
-    }
-    for (Task<? extends Serializable> task : childTasks) {
-      if (task instanceof StatsTask) {
-        return (StatsTask) task;
-      } else {
-        Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
-        if (childTask instanceof StatsTask) {
-          return (StatsTask) childTask;
-        } else {
-          continue;
-        }
-      }
-    }
-
-    return null;
-  }
-
-  private List<Map<String, String>> getPartitionSpecs(StatsWork work) throws HiveException {
-    if (work.getLoadFileDesc() != null) {
-      return null; //we are in CTAS, so we know there are no partitions
-    }
-    Table table;
-    List<Map<String, String>> partitionSpecs = new ArrayList<Map<String, String>>();
-
-    if (work.getTableSpecs() != null) {
-
-      // ANALYZE command
-      TableSpec tblSpec = work.getTableSpecs();
-      table = tblSpec.tableHandle;
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      // get all partitions that matches with the partition spec
-      List<Partition> partitions = tblSpec.partitions;
-      if (partitions != null) {
-        for (Partition partition : partitions) {
-          partitionSpecs.add(partition.getSpec());
-        }
-      }
-    } else if (work.getLoadTableDesc() != null) {
-
-      // INSERT OVERWRITE command
-      LoadTableDesc tbd = work.getLoadTableDesc();
-      table = db.getTable(tbd.getTable().getTableName());
-      if (!table.isPartitioned()) {
-        return null;
-      }
-      DynamicPartitionCtx dpCtx = tbd.getDPCtx();
-      if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
-        // we could not get dynamic partition information before SparkTask execution.
-      } else { // static partition
-        partitionSpecs.add(tbd.getPartitionSpec());
-      }
-    }
-    return partitionSpecs;
-  }
-
   private Map<String, List<String>> getOperatorCounters() {
     String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
     Map<String, List<String>> counters = new HashMap<String, List<String>>();


[03/11] hive git commit: HIVE-12554: Fix Spark branch build after merge [Spark Branch] (Rui via Xuefu)

Posted by li...@apache.org.
HIVE-12554: Fix Spark branch build after merge [Spark Branch] (Rui via Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a41ba5c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a41ba5c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a41ba5c

Branch: refs/heads/master
Commit: 4a41ba5cf029cd39d6f3c1765e2d9d4f090eab58
Parents: 120df07
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Tue Dec 1 10:49:04 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:22:45 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java  | 3 ++-
 ql/src/test/results/clientpositive/gen_udf_example_add10.q.out    | 1 +
 .../test/results/clientpositive/spark/gen_udf_example_add10.q.out | 1 +
 3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index d215873..ec0fdea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -133,7 +134,7 @@ public class HiveSparkClientFactory {
         LOG.info(String.format(
           "load yarn property from hive configuration in %s mode (%s -> %s).",
           sparkMaster, propertyName, value));
-      } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) {
+      } else if (propertyName.equals(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) {
         String value = hiveConf.get(propertyName);
         if (value != null && !value.isEmpty()) {
           sparkConf.put("spark.hadoop." + propertyName, value);

http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
index 984554d..cab2ec8 100644
--- a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
+++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
@@ -43,6 +43,7 @@ STAGE PLANS:
                 key expressions: _col0 (type: int), _col1 (type: double)
                 sort order: -+
                 Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                TopN Hash Memory Usage: 0.1
       Reduce Operator Tree:
         Select Operator
           expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)

http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
index 05ec1f5..493d0a4 100644
--- a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
+++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
@@ -48,6 +48,7 @@ STAGE PLANS:
                       key expressions: _col0 (type: int), _col1 (type: double)
                       sort order: -+
                       Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+                      TopN Hash Memory Usage: 0.1
         Reducer 2 
             Reduce Operator Tree:
               Select Operator