You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/01/28 08:20:32 UTC
[01/11] hive git commit: HIVE-12045: ClassNotFoundException for
GenericUDF [Spark Branch] (Rui via Xuefu)
Repository: hive
Updated Branches:
refs/heads/master 8c65c3455 -> 277825fe8
HIVE-12045: ClassNotFoundException for GenericUDF [Spark Branch] (Rui via Xuefu)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cd48911
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cd48911
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cd48911
Branch: refs/heads/master
Commit: 5cd4891192e63075b9c0ed2425294a45b3ca8688
Parents: 8c65c34
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Fri Nov 20 14:16:21 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:20:35 2016 +0800
----------------------------------------------------------------------
.../genericudf/example/GenericUDFAdd10.java | 151 +++++++++++++++++++
data/conf/spark/yarn-client/hive-site.xml | 7 +-
.../test/resources/testconfiguration.properties | 1 +
.../ql/exec/spark/HiveSparkClientFactory.java | 5 +
.../ql/exec/spark/RemoteHiveSparkClient.java | 5 +-
.../hive/ql/exec/spark/SparkUtilities.java | 3 +-
.../ql/exec/spark/session/SparkSession.java | 8 +
.../ql/exec/spark/session/SparkSessionImpl.java | 39 +++++
.../clientpositive/gen_udf_example_add10.q | 13 ++
.../clientpositive/gen_udf_example_add10.q.out | 95 ++++++++++++
.../spark/gen_udf_example_add10.q.out | 101 +++++++++++++
.../hive/spark/client/SparkClientUtilities.java | 9 +-
12 files changed, 431 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
----------------------------------------------------------------------
diff --git a/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
new file mode 100644
index 0000000..b87de09
--- /dev/null
+++ b/contrib/src/java/org/apache/hadoop/hive/contrib/genericudf/example/GenericUDFAdd10.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.contrib.genericudf.example;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * GenericUDFAbs.
+ *
+ */
+@Description(name = "add10",
+ value = "_FUNC_(x) - returns 10 plus the original value of x",
+ extended = "Example:\n"
+ + " > SELECT _FUNC_(0) FROM src LIMIT 1;\n"
+ + " 10\n"
+ + " > SELECT _FUNC_(-5) FROM src LIMIT 1;\n" + " 5")
+public class GenericUDFAdd10 extends GenericUDF {
+ private transient PrimitiveCategory inputType;
+ private final DoubleWritable resultDouble = new DoubleWritable();
+ private final LongWritable resultLong = new LongWritable();
+ private final IntWritable resultInt = new IntWritable();
+ private final HiveDecimalWritable resultDecimal = new HiveDecimalWritable();
+ private transient PrimitiveObjectInspector argumentOI;
+ private transient Converter inputConverter;
+
+ @Override
+ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+ if (arguments.length != 1) {
+ throw new UDFArgumentLengthException(
+ "ADD10() requires 1 argument, got " + arguments.length);
+ }
+
+ if (arguments[0].getCategory() != Category.PRIMITIVE) {
+ throw new UDFArgumentException(
+ "ADD10 only takes primitive types, got " + arguments[0].getTypeName());
+ }
+ argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+ inputType = argumentOI.getPrimitiveCategory();
+ ObjectInspector outputOI = null;
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ break;
+ case LONG:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ break;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+ outputOI = PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ break;
+ case DECIMAL:
+ outputOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ ((PrimitiveObjectInspector) arguments[0]).getTypeInfo());
+ inputConverter = ObjectInspectorConverters.getConverter(arguments[0],
+ outputOI);
+ break;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ return outputOI;
+ }
+
+ @Override
+ public Object evaluate(DeferredObject[] arguments) throws HiveException {
+ Object valObject = arguments[0].get();
+ if (valObject == null) {
+ return null;
+ }
+ switch (inputType) {
+ case SHORT:
+ case BYTE:
+ case INT:
+ valObject = inputConverter.convert(valObject);
+ resultInt.set(10 + ((IntWritable) valObject).get());
+ return resultInt;
+ case LONG:
+ valObject = inputConverter.convert(valObject);
+ resultLong.set(10 + ((LongWritable) valObject).get());
+ return resultLong;
+ case FLOAT:
+ case STRING:
+ case DOUBLE:
+ valObject = inputConverter.convert(valObject);
+ resultDouble.set(10.0 + ((DoubleWritable) valObject).get());
+ return resultDouble;
+ case DECIMAL:
+ HiveDecimalObjectInspector decimalOI =
+ (HiveDecimalObjectInspector) argumentOI;
+ HiveDecimalWritable val = decimalOI.getPrimitiveWritableObject(valObject);
+
+ if (val != null) {
+ resultDecimal.set(val.getHiveDecimal().add(HiveDecimal.create("10")));
+ val = resultDecimal;
+ }
+ return val;
+ default:
+ throw new UDFArgumentException(
+ "ADD10 only takes SHORT/BYTE/INT/LONG/DOUBLE/FLOAT/STRING/DECIMAL types, got " + inputType);
+ }
+ }
+
+ @Override
+ public String getDisplayString(String[] children) {
+ return getStandardDisplayString("add10", children);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index aef7877..bc42f0d 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -196,7 +196,7 @@
<property>
<name>spark.master</name>
- <value>yarn-client</value>
+ <value>yarn-cluster</value>
</property>
<property>
@@ -255,4 +255,9 @@
<description>Internal marker for test. Used for masking env-dependent values</description>
</property>
+<property>
+ <name>hive.spark.client.connect.timeout</name>
+ <value>30000ms</value>
+</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 8318c3a..4df7d25 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1262,6 +1262,7 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
empty_dir_in_table.q,\
external_table_with_space_in_location_path.q,\
file_with_header_footer.q,\
+ gen_udf_example_add10.q,\
import_exported_table.q,\
index_bitmap3.q,\
index_bitmap_auto.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 259c12f..d215873 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -133,6 +133,11 @@ public class HiveSparkClientFactory {
LOG.info(String.format(
"load yarn property from hive configuration in %s mode (%s -> %s).",
sparkMaster, propertyName, value));
+ } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) {
+ String value = hiveConf.get(propertyName);
+ if (value != null && !value.isEmpty()) {
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ }
}
if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
String value = RpcConfiguration.getValue(hiveConf, propertyName);
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 6380774..11e7116 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -297,8 +297,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient {
// may need to load classes from this jar in other threads.
Map<String, Long> addedJars = jc.getAddedJars();
if (addedJars != null && !addedJars.isEmpty()) {
- SparkClientUtilities.addToClassPath(addedJars, localJobConf, jc.getLocalTmpDir());
- localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(addedJars.keySet(), ";"));
+ List<String> localAddedJars = SparkClientUtilities.addToClassPath(addedJars,
+ localJobConf, jc.getLocalTmpDir());
+ localJobConf.set(Utilities.HIVE_ADDED_JARS, StringUtils.join(localAddedJars, ";"));
}
Path localScratchDir = KryoSerializer.deserialize(scratchDirBytes, Path.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index a61cdc5..5a6bef9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -89,7 +89,8 @@ public class SparkUtilities {
*/
public static URI uploadToHDFS(URI source, HiveConf conf) throws IOException {
Path localFile = new Path(source.getPath());
- Path remoteFile = new Path(SessionState.getHDFSSessionPath(conf), getFileName(source));
+ Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(),
+ getFileName(source));
FileSystem fileSystem = FileSystem.get(conf);
// Overwrite if the remote file already exists. Whether the file can be added
// on executor is up to spark, i.e. spark.files.overwrite
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
index 3d4b39b..e1f8c1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.exec.spark.session;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -24,6 +25,8 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import java.io.IOException;
+
public interface SparkSession {
/**
* Initializes a Spark session for DAG execution.
@@ -67,4 +70,9 @@ public interface SparkSession {
* Close session and release resources.
*/
void close();
+
+ /**
+ * Get an HDFS dir specific to the SparkSession
+ * */
+ Path getHDFSSessionDir() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index f04e145..51c6715 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.hive.ql.exec.spark.session;
import java.io.IOException;
import java.util.UUID;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -37,11 +41,14 @@ import com.google.common.base.Preconditions;
public class SparkSessionImpl implements SparkSession {
private static final Logger LOG = LoggerFactory.getLogger(SparkSession.class);
+ private static final String SPARK_DIR = "_spark_session_dir";
private HiveConf conf;
private boolean isOpen;
private final String sessionId;
private HiveSparkClient hiveSparkClient;
+ private Path scratchDir;
+ private final Object dirLock = new Object();
public SparkSessionImpl() {
sessionId = makeSessionId();
@@ -118,6 +125,7 @@ public class SparkSessionImpl implements SparkSession {
if (hiveSparkClient != null) {
try {
hiveSparkClient.close();
+ cleanScratchDir();
} catch (IOException e) {
LOG.error("Failed to close spark session (" + sessionId + ").", e);
}
@@ -125,6 +133,37 @@ public class SparkSessionImpl implements SparkSession {
hiveSparkClient = null;
}
+ private Path createScratchDir() throws IOException {
+ Path parent = new Path(SessionState.get().getHdfsScratchDirURIString(), SPARK_DIR);
+ Path sparkDir = new Path(parent, sessionId);
+ FileSystem fs = sparkDir.getFileSystem(conf);
+ FsPermission fsPermission = new FsPermission(HiveConf.getVar(
+ conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION));
+ fs.mkdirs(sparkDir, fsPermission);
+ fs.deleteOnExit(sparkDir);
+ return sparkDir;
+ }
+
+ private void cleanScratchDir() throws IOException {
+ if (scratchDir != null) {
+ FileSystem fs = scratchDir.getFileSystem(conf);
+ fs.delete(scratchDir, true);
+ scratchDir = null;
+ }
+ }
+
+ @Override
+ public Path getHDFSSessionDir() throws IOException {
+ if (scratchDir == null) {
+ synchronized (dirLock) {
+ if (scratchDir == null) {
+ scratchDir = createScratchDir();
+ }
+ }
+ }
+ return scratchDir;
+ }
+
public static String makeSessionId() {
return UUID.randomUUID().toString();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/gen_udf_example_add10.q b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
new file mode 100644
index 0000000..69178c9
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/gen_udf_example_add10.q
@@ -0,0 +1,13 @@
+add jar ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;
+
+create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10';
+
+create table t1(x int,y double);
+load data local inpath '../../data/files/T1.txt' into table t1;
+
+explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10;
+
+drop table t1;
+drop temporary function example_add10;
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..984554d
--- /dev/null
+++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
@@ -0,0 +1,95 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: add10(x) (type: int), add10(y) (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: double)
+ sort order: -+
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Limit
+ Number of rows: 10
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 10
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18 28.0
+18 38.0
+17 27.0
+13 23.0
+12 22.0
+11 21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
new file mode 100644
index 0000000..05ec1f5
--- /dev/null
+++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+PREHOOK: type: CREATEFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: create temporary function example_add10 as 'org.apache.hadoop.hive.contrib.genericudf.example.GenericUDFAdd10'
+POSTHOOK: type: CREATEFUNCTION
+POSTHOOK: Output: example_add10
+PREHOOK: query: create table t1(x int,y double)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1
+POSTHOOK: query: create table t1(x int,y double)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1
+PREHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+PREHOOK: type: LOAD
+#### A masked pattern was here ####
+PREHOOK: Output: default@t1
+POSTHOOK: query: load data local inpath '../../data/files/T1.txt' into table t1
+POSTHOOK: type: LOAD
+#### A masked pattern was here ####
+POSTHOOK: Output: default@t1
+PREHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+POSTHOOK: query: explain select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Spark
+ Edges:
+ Reducer 2 <- Map 1 (SORT, 1)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: t1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: add10(x) (type: int), add10(y) (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: _col0 (type: int), _col1 (type: double)
+ sort order: -+
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Reducer 2
+ Reduce Operator Tree:
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ Limit
+ Number of rows: 10
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: 10
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1
+#### A masked pattern was here ####
+POSTHOOK: query: select example_add10(x) as a,example_add10(y) as b from t1 order by a desc,b limit 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1
+#### A masked pattern was here ####
+18 28.0
+18 38.0
+17 27.0
+13 23.0
+12 22.0
+11 21.0
+PREHOOK: query: drop table t1
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@t1
+PREHOOK: Output: default@t1
+POSTHOOK: query: drop table t1
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: default@t1
+PREHOOK: query: drop temporary function example_add10
+PREHOOK: type: DROPFUNCTION
+PREHOOK: Output: example_add10
+POSTHOOK: query: drop temporary function example_add10
+POSTHOOK: type: DROPFUNCTION
+POSTHOOK: Output: example_add10
http://git-wip-us.apache.org/repos/asf/hive/blob/5cd48911/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
index b779f3f..6251861 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientUtilities.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,15 +44,18 @@ public class SparkClientUtilities {
* Add new elements to the classpath.
*
* @param newPaths Map of classpath elements and corresponding timestamp
+ * @return locally accessible files corresponding to the newPaths
*/
- public static void addToClassPath(Map<String, Long> newPaths, Configuration conf, File localTmpDir)
- throws Exception {
+ public static List<String> addToClassPath(Map<String, Long> newPaths, Configuration conf,
+ File localTmpDir) throws Exception {
URLClassLoader loader = (URLClassLoader) Thread.currentThread().getContextClassLoader();
List<URL> curPath = Lists.newArrayList(loader.getURLs());
+ List<String> localNewPaths = new ArrayList<>();
boolean newPathAdded = false;
for (Map.Entry<String, Long> entry : newPaths.entrySet()) {
URL newUrl = urlFromPathString(entry.getKey(), entry.getValue(), conf, localTmpDir);
+ localNewPaths.add(newUrl.toString());
if (newUrl != null && !curPath.contains(newUrl)) {
curPath.add(newUrl);
LOG.info("Added jar[" + newUrl + "] to classpath.");
@@ -64,6 +68,7 @@ public class SparkClientUtilities {
new URLClassLoader(curPath.toArray(new URL[curPath.size()]), loader);
Thread.currentThread().setContextClassLoader(newLoader);
}
+ return localNewPaths;
}
/**
[10/11] hive git commit: HIVE-9774: Print yarn application id to
console [Spark Branch] (Rui reviewed by Xuefu)
Posted by li...@apache.org.
HIVE-9774: Print yarn application id to console [Spark Branch] (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b9f99591
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b9f99591
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b9f99591
Branch: refs/heads/master
Commit: b9f99591f6350d20902708c28bfbb708d7603c6e
Parents: e1a7503
Author: Rui Li <ru...@intel.com>
Authored: Tue Jan 19 09:51:29 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:53:06 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 22 +++++++++++++++-----
.../spark/status/RemoteSparkJobMonitor.java | 15 +++++++++++++
.../ql/exec/spark/status/SparkJobStatus.java | 2 ++
.../spark/status/impl/LocalSparkJobStatus.java | 5 +++++
.../spark/status/impl/RemoteSparkJobStatus.java | 22 ++++++++++++++++++++
5 files changed, 61 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b9d4b5e..0ff0b39 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -387,6 +387,7 @@ public class HiveConf extends Configuration {
// a symbolic name to reference in the Hive source code. Properties with non-null
// values will override any values set in the underlying Hadoop configuration.
HADOOPBIN("hadoop.bin.path", findHadoopBinary(), "", true),
+ YARNBIN("yarn.bin.path", findYarnBinary(), "", true),
HIVE_FS_HAR_IMPL("fs.har.impl", "org.apache.hadoop.hive.shims.HiveHarFileSystem",
"The implementation for accessing Hadoop Archives. Note that this won't be applicable to Hadoop versions less than 0.20"),
MAPREDMAXSPLITSIZE(FileInputFormat.SPLIT_MAXSIZE, 256000000L, "", true),
@@ -2796,16 +2797,27 @@ public class HiveConf extends Configuration {
}
private static String findHadoopBinary() {
+ String val = findHadoopHome();
+ // if can't find hadoop home we can at least try /usr/bin/hadoop
+ val = (val == null ? File.separator + "usr" : val)
+ + File.separator + "bin" + File.separator + "hadoop";
+ // Launch hadoop command file on windows.
+ return val + (Shell.WINDOWS ? ".cmd" : "");
+ }
+
+ private static String findYarnBinary() {
+ String val = findHadoopHome();
+ val = (val == null ? "yarn" : val + File.separator + "bin" + File.separator + "yarn");
+ return val + (Shell.WINDOWS ? ".cmd" : "");
+ }
+
+ private static String findHadoopHome() {
String val = System.getenv("HADOOP_HOME");
// In Hadoop 1.X and Hadoop 2.X HADOOP_HOME is gone and replaced with HADOOP_PREFIX
if (val == null) {
val = System.getenv("HADOOP_PREFIX");
}
- // and if all else fails we can at least try /usr/bin/hadoop
- val = (val == null ? File.separator + "usr" : val)
- + File.separator + "bin" + File.separator + "hadoop";
- // Launch hadoop command file on windows.
- return val + (Shell.WINDOWS ? ".cmd" : "");
+ return val;
}
public String getDefaultValue() {
http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index fb0498a..6990e80 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,10 +34,12 @@ import org.apache.spark.JobExecutionStatus;
public class RemoteSparkJobMonitor extends SparkJobMonitor {
private RemoteSparkJobStatus sparkJobStatus;
+ private final HiveConf hiveConf;
public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) {
super(hiveConf);
this.sparkJobStatus = sparkJobStatus;
+ this.hiveConf = hiveConf;
}
@Override
@@ -77,6 +79,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
+ printAppInfo();
// print job stages.
console.printInfo("\nQuery Hive on Spark job["
+ sparkJobStatus.getJobId() + "] stages:");
@@ -137,4 +140,16 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB);
return rc;
}
+
+ private void printAppInfo() {
+ String sparkMaster = hiveConf.get("spark.master");
+ if (sparkMaster != null && sparkMaster.startsWith("yarn")) {
+ String appID = sparkJobStatus.getAppID();
+ if (appID != null) {
+ console.printInfo("Running with YARN Application = " + appID);
+ console.printInfo("Kill Command = " +
+ HiveConf.getVar(hiveConf, HiveConf.ConfVars.YARNBIN) + " application -kill " + appID);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
index fa45ec8..7959089 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
@@ -29,6 +29,8 @@ import java.util.Map;
*/
public interface SparkJobStatus {
+ String getAppID();
+
int getJobId();
JobExecutionStatus getState() throws HiveException;
http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
index ebc5c16..3c15521 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
@@ -66,6 +66,11 @@ public class LocalSparkJobStatus implements SparkJobStatus {
}
@Override
+ public String getAppID() {
+ return sparkContext.sc().applicationId();
+ }
+
+ @Override
public int getJobId() {
return jobId;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b9f99591/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index e8d581f..d84c026 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -62,6 +62,17 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
}
@Override
+ public String getAppID() {
+ Future<String> getAppID = sparkClient.run(new GetAppIDJob());
+ try {
+ return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.warn("Failed to get APP ID.", e);
+ return null;
+ }
+ }
+
+ @Override
public int getJobId() {
return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1;
}
@@ -268,4 +279,15 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
}
};
}
+
+ private static class GetAppIDJob implements Job<String> {
+
+ public GetAppIDJob() {
+ }
+
+ @Override
+ public String call(JobContext jc) throws Exception {
+ return jc.sc().sc().applicationId();
+ }
+ }
}
[11/11] hive git commit: HIVE-12888: TestSparkNegativeCliDriver does
not run in Spark mode[Spark Branch] (Chengxiang via Xuefu)
Posted by li...@apache.org.
HIVE-12888: TestSparkNegativeCliDriver does not run in Spark mode[Spark Branch] (Chengxiang via Xuefu)
Conflicts:
ql/src/test/templates/TestNegativeCliDriver.vm
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/277825fe
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/277825fe
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/277825fe
Branch: refs/heads/master
Commit: 277825fe8ea706ffbbb0d35d484097bc9e01a70f
Parents: b9f9959
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Tue Jan 26 19:31:49 2016 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:55:20 2016 +0800
----------------------------------------------------------------------
ql/src/test/templates/TestNegativeCliDriver.vm | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/277825fe/ql/src/test/templates/TestNegativeCliDriver.vm
----------------------------------------------------------------------
diff --git a/ql/src/test/templates/TestNegativeCliDriver.vm b/ql/src/test/templates/TestNegativeCliDriver.vm
index d1cbbfd..592d64f 100644
--- a/ql/src/test/templates/TestNegativeCliDriver.vm
+++ b/ql/src/test/templates/TestNegativeCliDriver.vm
@@ -34,13 +34,17 @@ public class $className {
static {
MiniClusterType miniMR = MiniClusterType.valueForString("$clusterMode");
+ String hiveConfDir = "$hiveConfDir";
String initScript = "$initScript";
String cleanupScript = "$cleanupScript";
try {
String hadoopVer = "$hadoopVersion";
- qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR, null, hadoopVer,
- initScript, cleanupScript, false, false);
+ if (!hiveConfDir.isEmpty()) {
+ hiveConfDir = HIVE_ROOT + hiveConfDir;
+ }
+ qt = new QTestUtil((HIVE_ROOT + "$resultsDir"), (HIVE_ROOT + "$logDir"), miniMR,
+ hiveConfDir, hadoopVer, initScript, cleanupScript, false, false);
// do a one time initialization
qt.cleanUp();
qt.createSources();
[05/11] hive git commit: HIVE-12568: Provide an option to specify
network interface used by Spark remote client [Spark Branch] (reviewed by
Jimmy)
Posted by li...@apache.org.
HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f57569b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f57569b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f57569b
Branch: refs/heads/master
Commit: 9f57569b0f648bb5596df60e0a62db06930778ea
Parents: 72e070f
Author: xzhang <xz...@xzdt>
Authored: Mon Dec 7 11:10:25 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:50:07 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hive/common/ServerUtils.java | 19 +++++++
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++
.../service/cli/thrift/ThriftCLIService.java | 15 +++---
.../hive/spark/client/rpc/RpcConfiguration.java | 57 +++++++-------------
4 files changed, 50 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
index 83517ce..b44f92f 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.common;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -47,4 +50,20 @@ public class ServerUtils {
}
}
+ /**
+ * Get the Inet address of the machine of the given host name.
+ * @param hostname The name of the host
+ * @return The network address of the the host
+ * @throws UnknownHostException
+ */
+ public static InetAddress getHostAddress(String hostname) throws UnknownHostException {
+ InetAddress serverIPAddress;
+ if (hostname != null && !hostname.isEmpty()) {
+ serverIPAddress = InetAddress.getByName(hostname);
+ } else {
+ serverIPAddress = InetAddress.getLocalHost();
+ }
+ return serverIPAddress;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3e10fd4..c4034a5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2601,6 +2601,11 @@ public class HiveConf extends Configuration {
"Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."),
SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
"Name of the SASL mechanism to use for authentication."),
+ SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
+ "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
+ "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
+ "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
+ "hive.server2.thrift.bind.host is to be used."),
SPARK_DYNAMIC_PARTITION_PRUNING(
"hive.spark.dynamic.partition.pruning", false,
"When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
index 78b4b31..5c8fa3c 100644
--- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
+++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.ServiceException;
import org.apache.hive.service.ServiceUtils;
@@ -203,21 +204,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe
@Override
public synchronized void init(HiveConf hiveConf) {
this.hiveConf = hiveConf;
- // Initialize common server configs needed in both binary & http modes
- String portString;
- hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+
+ String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
if (hiveHost == null) {
hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
}
try {
- if (hiveHost != null && !hiveHost.isEmpty()) {
- serverIPAddress = InetAddress.getByName(hiveHost);
- } else {
- serverIPAddress = InetAddress.getLocalHost();
- }
+ serverIPAddress = ServerUtils.getHostAddress(hiveHost);
} catch (UnknownHostException e) {
throw new ServiceException(e);
}
+
+ // Initialize common server configs needed in both binary & http modes
+ String portString;
// HTTP mode
if (HiveServer2.isHTTPTransportMode(hiveConf)) {
workerKeepAliveTime =
http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 9c8cea0..e387659 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,20 +18,19 @@
package org.apache.hive.spark.client.rpc;
import java.io.IOException;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+
import javax.security.sasl.Sasl;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -49,15 +48,14 @@ public final class RpcConfiguration {
HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname,
HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname,
HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname,
- HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname
+ HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname,
+ HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname
);
public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of(
HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname,
HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname
);
- public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address";
-
/** Prefix for other SASL options. */
public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl.";
@@ -91,39 +89,22 @@ public final class RpcConfiguration {
return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal;
}
+ /**
+ * Here we assume that the remote driver will connect back to HS2 using the same network interface
+ * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that.
+ * For now, I think we are okay.
+ * @return server host name in the network
+ * @throws IOException
+ */
String getServerAddress() throws IOException {
- String value = config.get(SERVER_LISTEN_ADDRESS_KEY);
- if (value != null) {
- return value;
- }
-
- InetAddress address = InetAddress.getLocalHost();
- if (address.isLoopbackAddress()) {
- // Address resolves to something like 127.0.1.1, which happens on Debian;
- // try to find a better address using the local network interfaces
- Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces();
- while (ifaces.hasMoreElements()) {
- NetworkInterface ni = ifaces.nextElement();
- Enumeration<InetAddress> addrs = ni.getInetAddresses();
- while (addrs.hasMoreElements()) {
- InetAddress addr = addrs.nextElement();
- if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress()
- && addr instanceof Inet4Address) {
- // We've found an address that looks reasonable!
- LOG.warn("Your hostname, {}, resolves to a loopback address; using {} "
- + " instead (on interface {})", address.getHostName(), addr.getHostAddress(),
- ni.getName());
- LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
- return addr.getHostAddress();
- }
- }
+ String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS);
+ if(StringUtils.isEmpty(hiveHost)) {
+ hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST");
+ if (hiveHost == null) {
+ hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
}
}
-
- LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find "
- + " any external IP address!", address.getHostName());
- LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY);
- return address.getHostName();
+ return ServerUtils.getHostAddress(hiveHost).getHostName();
}
String getRpcChannelLogLevel() {
[07/11] hive git commit: HIVE-12811: Name yarn application name more
meaning than just "Hive on Spark" (Rui reviewed by Xuefu)
Posted by li...@apache.org.
HIVE-12811: Name yarn application name more meaning than just "Hive on Spark" (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e47e1142
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e47e1142
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e47e1142
Branch: refs/heads/master
Commit: e47e11424f8867fb88266bda796f5639ccdd6d28
Parents: bb5ad57
Author: Rui Li <ru...@intel.com>
Authored: Wed Jan 13 10:41:27 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:51:15 2016 +0800
----------------------------------------------------------------------
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 3 ++-
.../hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java | 7 ++++++-
2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e47e1142/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c4034a5..b9d4b5e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2925,7 +2925,8 @@ public class HiveConf extends Configuration {
private boolean isSparkRelatedConfig(String name) {
boolean result = false;
if (name.startsWith("spark")) { // Spark property.
- result = true;
+ // for now we don't support changing spark app name on the fly
+ result = !name.equals("spark.app.name");
} else if (name.startsWith("yarn")) { // YARN property in Spark on YARN mode.
String sparkMaster = get("spark.master");
if (sparkMaster != null &&
http://git-wip-us.apache.org/repos/asf/hive/blob/e47e1142/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index 9b2dce3..a832bf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -72,7 +72,12 @@ public class HiveSparkClientFactory {
// set default spark configurations.
sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
- sparkConf.put("spark.app.name", SPARK_DEFAULT_APP_NAME);
+ final String appNameKey = "spark.app.name";
+ String appName = hiveConf.get(appNameKey);
+ if (appName == null) {
+ appName = SPARK_DEFAULT_APP_NAME;
+ }
+ sparkConf.put(appNameKey, appName);
sparkConf.put("spark.serializer", SPARK_DEFAULT_SERIALIZER);
sparkConf.put("spark.kryo.referenceTracking", SPARK_DEFAULT_REFERENCE_TRACKING);
[02/11] hive git commit: HIVE-12466: SparkCounter not initialized
error (Rui via Chengxiang)
Posted by li...@apache.org.
HIVE-12466: SparkCounter not initialized error (Rui via Chengxiang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/120df071
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/120df071
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/120df071
Branch: refs/heads/master
Commit: 120df07186703dd2ecc930bbc5dfda191ad40773
Parents: 5cd4891
Author: chengxiang <ch...@apache.com>
Authored: Wed Nov 25 11:07:12 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:22:09 2016 +0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 17 ++++++++++-------
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 14 +++++++++-----
.../hadoop/hive/ql/exec/spark/SparkTask.java | 4 ++--
3 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 14121b6..0899793 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -441,13 +441,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
- String suffix = Integer.toString(conf.getDestTableId());
- String fullName = conf.getTableInfo().getTableName();
- if (fullName != null) {
- suffix = suffix + "_" + fullName.toLowerCase();
- }
-
- statsMap.put(Counter.RECORDS_OUT + "_" + suffix, row_count);
+ statsMap.put(getCounterName(Counter.RECORDS_OUT), row_count);
} catch (HiveException e) {
throw e;
} catch (Exception e) {
@@ -456,6 +450,15 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
+ public String getCounterName(Counter counter) {
+ String suffix = Integer.toString(conf.getDestTableId());
+ String fullName = conf.getTableInfo().getTableName();
+ if (fullName != null) {
+ suffix = suffix + "_" + fullName.toLowerCase();
+ }
+ return counter + "_" + suffix;
+ }
+
private void logOutputFormatError(Configuration hconf, HiveException ex) {
StringWriter errorWriter = new StringWriter();
errorWriter.append("Failed to create output format; configuration: ");
http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 74b4802..e692460 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -170,11 +170,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
cntr = 1;
logEveryNRows = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_LOG_N_RECORDS);
- String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
- if (context != null && !context.isEmpty()) {
- context = "_" + context.replace(" ","_");
- }
- statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+ statsMap.put(getCounterName(Counter.RECORDS_OUT_INTERMEDIATE, hconf), recordCounter);
List<ExprNodeDesc> keys = conf.getKeyCols();
@@ -256,6 +252,14 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
}
}
+ public String getCounterName(Counter counter, Configuration hconf) {
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ", "_");
+ }
+ return counter + context;
+ }
+
/**
* Initializes array of ExprNodeEvaluator. Adds Union field for distinct
http://git-wip-us.apache.org/repos/asf/hive/blob/120df071/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index eb93aca..faa075a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -386,11 +386,11 @@ public class SparkTask extends Task<SparkWork> {
for (Operator<? extends OperatorDesc> operator : work.getAllOperators()) {
if (operator instanceof FileSinkOperator) {
for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
- hiveCounters.add(counter.toString());
+ hiveCounters.add(((FileSinkOperator) operator).getCounterName(counter));
}
} else if (operator instanceof ReduceSinkOperator) {
for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
- hiveCounters.add(counter.toString());
+ hiveCounters.add(((ReduceSinkOperator) operator).getCounterName(counter, conf));
}
} else if (operator instanceof ScriptOperator) {
for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
[09/11] hive git commit: HIVE-12611: Make sure spark.yarn.queue is
effective and takes the value from mapreduce.job.queuename if given [Spark
Branch] (Rui reviewed by Xuefu)
Posted by li...@apache.org.
HIVE-12611: Make sure spark.yarn.queue is effective and takes the value from mapreduce.job.queuename if given [Spark Branch] (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e1a7503b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e1a7503b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e1a7503b
Branch: refs/heads/master
Commit: e1a7503b81a5509e391940709c8f9a6f552646a7
Parents: 34b41e3
Author: Rui Li <ru...@intel.com>
Authored: Mon Jan 18 09:14:56 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:52:38 2016 +0800
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e1a7503b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index a832bf6..993d02b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -174,6 +174,15 @@ public class HiveSparkClientFactory {
classes.add(HiveKey.class.getName());
sparkConf.put("spark.kryo.classesToRegister", Joiner.on(",").join(classes));
+ // set yarn queue name
+ final String sparkQueueNameKey = "spark.yarn.queue";
+ if (sparkMaster.startsWith("yarn") && hiveConf.get(sparkQueueNameKey) == null) {
+ String queueName = hiveConf.get("mapreduce.job.queuename");
+ if (queueName != null) {
+ sparkConf.put(sparkQueueNameKey, queueName);
+ }
+ }
+
return sparkConf;
}
[06/11] hive git commit: HIVE-12708: Hive on Spark doesn't work with
Kerboresed HBase [Spark Branch] (reviewed by Szehon)
Posted by li...@apache.org.
HIVE-12708: Hive on Spark doesn't work with Kerboresed HBase [Spark Branch] (reviewed by Szehon)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb5ad573
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb5ad573
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb5ad573
Branch: refs/heads/master
Commit: bb5ad5733d0d5dfc2489deb658beea609405a29c
Parents: 9f57569
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Fri Dec 18 14:37:03 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:50:43 2016 +0800
----------------------------------------------------------------------
.../hive/ql/exec/spark/HiveSparkClientFactory.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bb5ad573/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index ec0fdea..9b2dce3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -30,6 +30,7 @@ import org.apache.commons.compress.utils.CharsetNames;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -67,6 +68,7 @@ public class HiveSparkClientFactory {
public static Map<String, String> initiateSparkConf(HiveConf hiveConf) {
Map<String, String> sparkConf = new HashMap<String, String>();
+ HBaseConfiguration.addHbaseResources(hiveConf);
// set default spark configurations.
sparkConf.put("spark.master", SPARK_DEFAULT_MASTER);
@@ -139,7 +141,16 @@ public class HiveSparkClientFactory {
if (value != null && !value.isEmpty()) {
sparkConf.put("spark.hadoop." + propertyName, value);
}
+ } else if (propertyName.startsWith("hbase")) {
+ // Add HBase related configuration to Spark because in security mode, Spark needs it
+ // to generate hbase delegation token for Spark. This is a temp solution to deal with
+ // Spark problem.
+ String value = hiveConf.get(propertyName);
+ sparkConf.put("spark.hadoop." + propertyName, value);
+ LOG.info(String.format(
+ "load HBase configuration (%s -> %s).", propertyName, value));
}
+
if (RpcConfiguration.HIVE_SPARK_RSC_CONFIGS.contains(propertyName)) {
String value = RpcConfiguration.getValue(hiveConf, propertyName);
sparkConf.put(propertyName, value);
[08/11] hive git commit: HIVE-12828: Update Spark version to 1.6 (Rui
reviewed by Xuefu)
Posted by li...@apache.org.
HIVE-12828: Update Spark version to 1.6 (Rui reviewed by Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/34b41e38
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/34b41e38
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/34b41e38
Branch: refs/heads/master
Commit: 34b41e389ab331f1d61b077fbea8cb855775698f
Parents: e47e114
Author: Rui Li <ru...@intel.com>
Authored: Fri Jan 15 21:24:39 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:51:55 2016 +0800
----------------------------------------------------------------------
data/conf/spark/yarn-client/hive-site.xml | 17 ++++++++++++++++-
pom.xml | 2 +-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 2 ++
3 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/data/conf/spark/yarn-client/hive-site.xml
----------------------------------------------------------------------
diff --git a/data/conf/spark/yarn-client/hive-site.xml b/data/conf/spark/yarn-client/hive-site.xml
index bc42f0d..f1d9ddc 100644
--- a/data/conf/spark/yarn-client/hive-site.xml
+++ b/data/conf/spark/yarn-client/hive-site.xml
@@ -231,7 +231,22 @@
<property>
<name>spark.executor.memory</name>
- <value>512m</value>
+ <value>1g</value>
+</property>
+
+<property>
+ <name>spark.yarn.executor.memoryOverhead</name>
+ <value>0</value>
+</property>
+
+<property>
+ <name>spark.driver.memory</name>
+ <value>1g</value>
+</property>
+
+<property>
+ <name>spark.yarn.driver.memoryOverhead</name>
+ <value>0</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2066518..2d2a3de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
<ST4.version>4.0.4</ST4.version>
<tez.version>0.8.2</tez.version>
<super-csv.version>2.2.0</super-csv.version>
- <spark.version>1.5.0</spark.version>
+ <spark.version>1.6.0</spark.version>
<scala.binary.version>2.10</scala.binary.version>
<scala.version>2.10.4</scala.version>
<tempus-fugit.version>1.1</tempus-fugit.version>
http://git-wip-us.apache.org/repos/asf/hive/blob/34b41e38/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
----------------------------------------------------------------------
diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index c03ae35..31060a2 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -526,6 +526,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
mr = new MiniSparkOnYARNCluster("sparkOnYarn");
conf.set("fs.defaultFS", nameNode);
conf.set("yarn.resourcemanager.scheduler.class", "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+ // disable resource monitoring, although it should be off by default
+ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, false);
configureImpersonation(conf);
mr.init(conf);
mr.start();
[04/11] hive git commit: HIVE-12515: Clean the SparkCounters related
code after remove counter based stats collection[Spark Branch] (Rui reviewed
by Chengxiang & Xuefu)
Posted by li...@apache.org.
HIVE-12515: Clean the SparkCounters related code after remove counter based stats collection[Spark Branch] (Rui reviewed by Chengxiang & Xuefu)
Conflicts:
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/72e070f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/72e070f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/72e070f4
Branch: refs/heads/master
Commit: 72e070f432db3eadc9b6d538157714cf8f47fca1
Parents: 4a41ba5
Author: Rui Li <ru...@intel.com>
Authored: Thu Dec 3 16:37:05 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:49:12 2016 +0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 4 -
.../hadoop/hive/ql/exec/spark/SparkTask.java | 146 +------------------
2 files changed, 1 insertion(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 4df7d25..ec6a2c7 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1091,8 +1091,6 @@ spark.query.files=add_part_multiple.q, \
stats7.q, \
stats8.q, \
stats9.q, \
- stats_counter.q, \
- stats_counter_partitioned.q, \
stats_noscan_1.q, \
stats_noscan_2.q, \
stats_only_null.q, \
@@ -1298,8 +1296,6 @@ miniSparkOnYarn.query.files=auto_sortmerge_join_16.q,\
schemeAuthority2.q,\
scriptfile1.q,\
scriptfile1_win.q,\
- stats_counter.q,\
- stats_counter_partitioned.q,\
temp_table_external.q,\
truncate_column_buckets.q,\
uber_reduce.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/72e070f4/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index faa075a..26cce1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,11 +28,7 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -44,7 +38,6 @@ import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
-import org.apache.hadoop.hive.ql.exec.StatsTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
@@ -58,25 +51,15 @@ import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.spark.counter.SparkCounters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
@@ -86,7 +69,6 @@ public class SparkTask extends Task<SparkWork> {
private static final LogHelper console = new LogHelper(LOG);
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final long serialVersionUID = 1L;
- private SparkCounters sparkCounters;
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext,
@@ -106,7 +88,7 @@ public class SparkTask extends Task<SparkWork> {
sparkSession = SparkUtilities.getSparkSession(conf, sparkSessionManager);
SparkWork sparkWork = getWork();
- sparkWork.setRequiredCounterPrefix(getCounterPrefixes());
+ sparkWork.setRequiredCounterPrefix(getOperatorCounters());
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
@@ -116,8 +98,6 @@ public class SparkTask extends Task<SparkWork> {
rc = jobRef.monitorJob();
SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus();
if (rc == 0) {
- sparkCounters = sparkJobStatus.getCounter();
- // for RSC, we should get the counters after job has finished
SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics();
if (LOG.isInfoEnabled() && sparkStatistics != null) {
LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId()));
@@ -233,10 +213,6 @@ public class SparkTask extends Task<SparkWork> {
return ((ReduceWork) children.get(0)).getReducer();
}
- public SparkCounters getSparkCounters() {
- return sparkCounters;
- }
-
/**
* Set the number of reducers for the spark work.
*/
@@ -250,126 +226,6 @@ public class SparkTask extends Task<SparkWork> {
console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "=<number>");
}
- private Map<String, List<String>> getCounterPrefixes() throws HiveException, MetaException {
- Map<String, List<String>> counters = getOperatorCounters();
- StatsTask statsTask = getStatsTaskInChildTasks(this);
- String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
- // fetch table prefix if SparkTask try to gather table statistics based on counter.
- if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) {
- List<String> prefixes = getRequiredCounterPrefix(statsTask);
- for (String prefix : prefixes) {
- List<String> counterGroup = counters.get(prefix);
- if (counterGroup == null) {
- counterGroup = new LinkedList<String>();
- counters.put(prefix, counterGroup);
- }
- counterGroup.add(StatsSetupConst.ROW_COUNT);
- counterGroup.add(StatsSetupConst.RAW_DATA_SIZE);
- }
- }
- return counters;
- }
-
- private List<String> getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException {
- List<String> prefixs = new LinkedList<String>();
- StatsWork statsWork = statsTask.getWork();
- String tablePrefix = getTablePrefix(statsWork);
- List<Map<String, String>> partitionSpecs = getPartitionSpecs(statsWork);
-
- if (partitionSpecs == null) {
- prefixs.add(tablePrefix.endsWith(Path.SEPARATOR) ? tablePrefix : tablePrefix + Path.SEPARATOR);
- } else {
- for (Map<String, String> partitionSpec : partitionSpecs) {
- String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partitionSpec));
- prefixs.add(prefixWithPartition.endsWith(Path.SEPARATOR) ? prefixWithPartition : prefixWithPartition + Path.SEPARATOR);
- }
- }
-
- return prefixs;
- }
-
- private String getTablePrefix(StatsWork work) throws HiveException {
- String tableName;
- if (work.getLoadTableDesc() != null) {
- tableName = work.getLoadTableDesc().getTable().getTableName();
- } else if (work.getTableSpecs() != null) {
- tableName = work.getTableSpecs().tableName;
- } else {
- tableName = work.getLoadFileDesc().getDestinationCreateTable();
- }
- Table table;
- try {
- table = db.getTable(tableName);
- } catch (HiveException e) {
- LOG.warn("Failed to get table:" + tableName);
- // For CTAS query, table does not exist in this period, just use table name as prefix.
- return tableName.toLowerCase();
- }
- return table.getDbName() + "." + table.getTableName();
- }
-
- private static StatsTask getStatsTaskInChildTasks(Task<? extends Serializable> rootTask) {
-
- List<Task<? extends Serializable>> childTasks = rootTask.getChildTasks();
- if (childTasks == null) {
- return null;
- }
- for (Task<? extends Serializable> task : childTasks) {
- if (task instanceof StatsTask) {
- return (StatsTask) task;
- } else {
- Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
- if (childTask instanceof StatsTask) {
- return (StatsTask) childTask;
- } else {
- continue;
- }
- }
- }
-
- return null;
- }
-
- private List<Map<String, String>> getPartitionSpecs(StatsWork work) throws HiveException {
- if (work.getLoadFileDesc() != null) {
- return null; //we are in CTAS, so we know there are no partitions
- }
- Table table;
- List<Map<String, String>> partitionSpecs = new ArrayList<Map<String, String>>();
-
- if (work.getTableSpecs() != null) {
-
- // ANALYZE command
- TableSpec tblSpec = work.getTableSpecs();
- table = tblSpec.tableHandle;
- if (!table.isPartitioned()) {
- return null;
- }
- // get all partitions that matches with the partition spec
- List<Partition> partitions = tblSpec.partitions;
- if (partitions != null) {
- for (Partition partition : partitions) {
- partitionSpecs.add(partition.getSpec());
- }
- }
- } else if (work.getLoadTableDesc() != null) {
-
- // INSERT OVERWRITE command
- LoadTableDesc tbd = work.getLoadTableDesc();
- table = db.getTable(tbd.getTable().getTableName());
- if (!table.isPartitioned()) {
- return null;
- }
- DynamicPartitionCtx dpCtx = tbd.getDPCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
- // we could not get dynamic partition information before SparkTask execution.
- } else { // static partition
- partitionSpecs.add(tbd.getPartitionSpec());
- }
- }
- return partitionSpecs;
- }
-
private Map<String, List<String>> getOperatorCounters() {
String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
Map<String, List<String>> counters = new HashMap<String, List<String>>();
[03/11] hive git commit: HIVE-12554: Fix Spark branch build after
merge [Spark Branch] (Rui via Xuefu)
Posted by li...@apache.org.
HIVE-12554: Fix Spark branch build after merge [Spark Branch] (Rui via Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a41ba5c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a41ba5c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a41ba5c
Branch: refs/heads/master
Commit: 4a41ba5cf029cd39d6f3c1765e2d9d4f090eab58
Parents: 120df07
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Tue Dec 1 10:49:04 2015 -0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu Jan 28 14:22:45 2016 +0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java | 3 ++-
ql/src/test/results/clientpositive/gen_udf_example_add10.q.out | 1 +
.../test/results/clientpositive/spark/gen_udf_example_add10.q.out | 1 +
3 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
index d215873..ec0fdea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
@@ -27,6 +27,7 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.compress.utils.CharsetNames;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -133,7 +134,7 @@ public class HiveSparkClientFactory {
LOG.info(String.format(
"load yarn property from hive configuration in %s mode (%s -> %s).",
sparkMaster, propertyName, value));
- } else if (propertyName.equals(HiveConf.ConfVars.HADOOPFS.varname)) {
+ } else if (propertyName.equals(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY)) {
String value = hiveConf.get(propertyName);
if (value != null && !value.isEmpty()) {
sparkConf.put("spark.hadoop." + propertyName, value);
http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
index 984554d..cab2ec8 100644
--- a/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
+++ b/ql/src/test/results/clientpositive/gen_udf_example_add10.q.out
@@ -43,6 +43,7 @@ STAGE PLANS:
key expressions: _col0 (type: int), _col1 (type: double)
sort order: -+
Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ TopN Hash Memory Usage: 0.1
Reduce Operator Tree:
Select Operator
expressions: KEY.reducesinkkey0 (type: int), KEY.reducesinkkey1 (type: double)
http://git-wip-us.apache.org/repos/asf/hive/blob/4a41ba5c/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
index 05ec1f5..493d0a4 100644
--- a/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
+++ b/ql/src/test/results/clientpositive/spark/gen_udf_example_add10.q.out
@@ -48,6 +48,7 @@ STAGE PLANS:
key expressions: _col0 (type: int), _col1 (type: double)
sort order: -+
Statistics: Num rows: 2 Data size: 30 Basic stats: COMPLETE Column stats: NONE
+ TopN Hash Memory Usage: 0.1
Reducer 2
Reduce Operator Tree:
Select Operator