You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/01/08 03:00:12 UTC
svn commit: r1650201 [1/3] - in /hive/branches/spark:
itests/hive-unit/src/test/java/org/apache/hive/jdbc/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
ql/src/java/org/apache/hadoop/hive/ql/exec/spark/se...
Author: szehon
Date: Thu Jan 8 02:00:11 2015
New Revision: 1650201
URL: http://svn.apache.org/r1650201
Log:
HIVE-9281 : Code cleanup [Spark Branch] (Szehon, reviewed by Xuefu)
Modified:
hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkCrossProductCheck.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinHintOptimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkReduceSinkMapJoinProc.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSMBJoinHintOptimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinOptimizer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SplitSparkWorkResolver.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWorkWalker.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkFileSinkProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkEdgeProperty.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/Job.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MetricsCollection.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/MonitorCallback.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClient.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/InputMetrics.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/metrics/Metrics.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/Rpc.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcDispatcher.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
hive/branches/spark/spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestMetricsCollection.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestKryoMessageCodec.java
hive/branches/spark/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
Modified: hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java (original)
+++ hive/branches/spark/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithLocalClusterSpark.java Thu Jan 8 02:00:11 2015
@@ -124,7 +124,7 @@ public class TestJdbcWithLocalClusterSpa
}
/**
- * Verify that the connection to HS2 with MiniMr is successful
+ * Verify that the connection to HS2 with MiniMr is successful.
* @throws Exception
*/
@Test
@@ -134,7 +134,7 @@ public class TestJdbcWithLocalClusterSpa
}
/**
- * Run nonMr query
+ * Run nonMr query.
* @throws Exception
*/
@Test
@@ -147,15 +147,15 @@ public class TestJdbcWithLocalClusterSpa
}
/**
- * Run nonMr query
+ * Run nonMr query.
* @throws Exception
*/
@Test
public void testSparkQuery() throws Exception {
String tableName = "testTab2";
String resultVal = "val_238";
- String queryStr = "SELECT * FROM " + tableName +
- " where value = '" + resultVal + "'";
+ String queryStr = "SELECT * FROM " + tableName
+ + " where value = '" + resultVal + "'";
testKvQuery(tableName, queryStr, resultVal);
}
@@ -233,8 +233,8 @@ public class TestJdbcWithLocalClusterSpa
+ dataFilePath.toString() + "' into table " + tempTableName);
String resultVal = "val_238";
- String queryStr = "SELECT * FROM " + tempTableName +
- " where value = '" + resultVal + "'";
+ String queryStr = "SELECT * FROM " + tempTableName
+ + " where value = '" + resultVal + "'";
verifyResult(queryStr, resultVal, 2);
// A second connection should not be able to see the table
@@ -244,8 +244,7 @@ public class TestJdbcWithLocalClusterSpa
stmt2.execute("USE " + dbName);
boolean gotException = false;
try {
- ResultSet res;
- res = stmt2.executeQuery(queryStr);
+ stmt2.executeQuery(queryStr);
} catch (SQLException err) {
// This is expected to fail.
assertTrue("Expecting table not found error, instead got: " + err,
@@ -266,7 +265,7 @@ public class TestJdbcWithLocalClusterSpa
}
/**
- * Verify if the given property contains the expected value
+ * Verify if the given property contains the expected value.
* @param propertyName
* @param expectedValue
* @throws Exception
@@ -275,7 +274,7 @@ public class TestJdbcWithLocalClusterSpa
Statement stmt = hs2Conn .createStatement();
ResultSet res = stmt.executeQuery("set " + propertyName);
assertTrue(res.next());
- String results[] = res.getString(1).split("=");
+ String[] results = res.getString(1).split("=");
assertEquals("Property should be set", results.length, 2);
assertEquals("Property should be set", expectedValue, results[1]);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java Thu Jan 8 02:00:11 2015
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.serde2.obj
public class SparkHashTableSinkOperator
extends TerminalOperator<SparkHashTableSinkDesc> implements Serializable {
+ private static final int MIN_REPLICATION = 10;
private static final long serialVersionUID = 1L;
private final String CLASS_NAME = this.getClass().getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
@@ -122,7 +123,6 @@ public class SparkHashTableSinkOperator
+ "-" + Math.abs(Utilities.randGen.nextInt()));
try {
// This will guarantee file name uniqueness.
- // TODO: can we use the task id, which should be unique
if (fs.createNewFile(path)) {
break;
}
@@ -131,10 +131,10 @@ public class SparkHashTableSinkOperator
}
// TODO find out numOfPartitions for the big table
int numOfPartitions = replication;
- replication = (short)Math.min(10, numOfPartitions);
+ replication = (short) Math.min(MIN_REPLICATION, numOfPartitions);
}
- htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
- " with group count: " + tableContainer.size() + " into file: " + path);
+ htsOperator.console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag
+ + " with group count: " + tableContainer.size() + " into file: " + path);
// get the hashtable file and path
// get the hashtable file and path
OutputStream os = null;
@@ -153,8 +153,8 @@ public class SparkHashTableSinkOperator
}
tableContainer.clear();
FileStatus status = fs.getFileStatus(path);
- htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path +
- " (" + status.getLen() + " bytes)");
+ htsOperator.console.printInfo(Utilities.now() + "\tUploaded 1 File to: " + path
+ + " (" + status.getLen() + " bytes)");
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_FLUSH_HASHTABLE + this.getName());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java Thu Jan 8 02:00:11 2015
@@ -101,7 +101,7 @@ public class HashTableLoader implements
bigInputPath = null;
} else {
Set<String> aliases =
- ((SparkBucketMapJoinContext)mapJoinCtx).getPosToAliasMap().get(pos);
+ ((SparkBucketMapJoinContext) mapJoinCtx).getPosToAliasMap().get(pos);
String alias = aliases.iterator().next();
// Any one small table input path
String smallInputPath =
@@ -110,7 +110,7 @@ public class HashTableLoader implements
}
}
String fileName = localWork.getBucketFileName(bigInputPath);
- Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte)pos, fileName);
+ Path path = Utilities.generatePath(baseDir, desc.getDumpFilePrefix(), (byte) pos, fileName);
LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path);
mapJoinTables[pos] = mapJoinTableSerdes[pos].load(fs, path);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Thu Jan 8 02:00:11 2015
@@ -70,13 +70,15 @@ public abstract class HiveBaseFunctionRe
/** Process the given record. */
protected abstract void processNextRecord(T inputRecord) throws IOException;
- /** Is the current state of the record processor done? */
+ /**
+ * @return true if current state of the record processor is done.
+ */
protected abstract boolean processingDone();
- /** Close the record processor */
+ /** Close the record processor. */
protected abstract void closeRecordProcessor();
- /** Implement Iterator interface */
+ /** Implement Iterator interface. */
public class ResultIterator implements Iterator {
@Override
public boolean hasNext(){
@@ -98,8 +100,7 @@ public abstract class HiveBaseFunctionRe
return true;
}
} catch (IOException ex) {
- // TODO: better handling of exception.
- throw new RuntimeException("Error while processing input.", ex);
+ throw new IllegalStateException("Error while processing input.", ex);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java Thu Jan 8 02:00:11 2015
@@ -79,7 +79,7 @@ public class HiveKVResultCache {
container.setSerDe(serDe, oi);
container.setTableDesc(tableDesc);
- } catch(Exception ex) {
+ } catch (Exception ex) {
throw new RuntimeException("Failed to create RowContainer", ex);
}
return container;
@@ -114,7 +114,7 @@ public class HiveKVResultCache {
}
try {
container.clearRows();
- } catch(HiveException ex) {
+ } catch (HiveException ex) {
throw new RuntimeException("Failed to clear rows in RowContainer", ex);
}
cursor = 0;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Thu Jan 8 02:00:11 2015
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.io.BytesWritable;
+
import scala.Tuple2;
import java.util.Iterator;
@@ -35,6 +36,7 @@ public class HiveMapFunction extends Hiv
super(jobConfBuffer, sparkReporter);
}
+ @SuppressWarnings("unchecked")
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
@@ -50,7 +52,6 @@ public class HiveMapFunction extends Hiv
}
HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler);
- //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709.
mapRecordHandler.init(jobConf, result, sparkReporter);
return result;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Thu Jan 8 02:00:11 2015
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.Reporter;
import scala.Tuple2;
import java.io.IOException;
@@ -32,6 +31,7 @@ public class HiveMapFunctionResultList e
/**
* Instantiate result set Iterable for Map function output.
*
+ * @param conf Hive configuration.
* @param inputIterator Input record iterator.
* @param handler Initialized {@link SparkMapRecordHandler} instance.
*/
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HivePairFlatMapFunction.java Thu Jan 8 02:00:11 2015
@@ -26,13 +26,13 @@ import org.apache.spark.api.java.functio
public abstract class HivePairFlatMapFunction<T, K, V> implements PairFlatMapFunction<T, K, V> {
- private static final NumberFormat taskIdFormat = NumberFormat.getInstance();
- private static final NumberFormat stageIdFormat = NumberFormat.getInstance();
+ private static final NumberFormat TASK_ID_FORMAT = NumberFormat.getInstance();
+ private static final NumberFormat STAGE_ID_FORMAT = NumberFormat.getInstance();
static {
- taskIdFormat.setGroupingUsed(false);
- taskIdFormat.setMinimumIntegerDigits(6);
- stageIdFormat.setGroupingUsed(false);
- stageIdFormat.setMinimumIntegerDigits(4);
+ TASK_ID_FORMAT.setGroupingUsed(false);
+ TASK_ID_FORMAT.setMinimumIntegerDigits(6);
+ STAGE_ID_FORMAT.setGroupingUsed(false);
+ STAGE_ID_FORMAT.setMinimumIntegerDigits(4);
}
protected transient JobConf jobConf;
@@ -60,7 +60,7 @@ public abstract class HivePairFlatMapFun
StringBuilder taskAttemptIdBuilder = new StringBuilder("attempt_");
taskAttemptIdBuilder.append(System.currentTimeMillis())
.append("_")
- .append(stageIdFormat.format(TaskContext.get().stageId()))
+ .append(STAGE_ID_FORMAT.format(TaskContext.get().stageId()))
.append("_");
if (isMap()) {
@@ -71,7 +71,7 @@ public abstract class HivePairFlatMapFun
// Spark task attempt id is increased by Spark context instead of task, which may introduce
// unstable qtest output, since non Hive features depends on this, we always set it to 0 here.
- taskAttemptIdBuilder.append(taskIdFormat.format(TaskContext.get().partitionId()))
+ taskAttemptIdBuilder.append(TASK_ID_FORMAT.format(TaskContext.get().partitionId()))
.append("_0");
String taskAttemptIdStr = taskAttemptIdBuilder.toString();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Thu Jan 8 02:00:11 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
+
import scala.Tuple2;
import java.util.Iterator;
@@ -33,6 +34,7 @@ public class HiveReduceFunction extends
super(buffer, sparkReporter);
}
+ @SuppressWarnings("unchecked")
@Override
public Iterable<Tuple2<HiveKey, BytesWritable>>
call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Thu Jan 8 02:00:11 2015
@@ -32,6 +32,7 @@ public class HiveReduceFunctionResultLis
/**
* Instantiate result set Iterable for Reduce function output.
*
+ * @param conf Hive configuration.
* @param inputIterator Input record iterator.
* @param reducer Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecReducer} instance.
*/
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClient.java Thu Jan 8 02:00:11 2015
@@ -34,12 +34,15 @@ public interface HiveSparkClient extends
* @return SparkJobRef could be used to track spark job progress and metrics.
* @throws Exception
*/
- public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+ SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) throws Exception;
- public SparkConf getSparkConf();
+ /**
+ * @return spark configuration
+ */
+ SparkConf getSparkConf();
/**
- * Get the count of executors
+ * @return the number of executors
*/
- public int getExecutorCount() throws Exception;
+ int getExecutorCount() throws Exception;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveSparkClientFactory.java Thu Jan 8 02:00:11 2015
@@ -33,7 +33,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
public class HiveSparkClientFactory {
- protected static transient final Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
+ protected static final transient Log LOG = LogFactory.getLog(HiveSparkClientFactory.class);
private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
private static final String SPARK_DEFAULT_MASTER = "local";
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java Thu Jan 8 02:00:11 2015
@@ -27,10 +27,8 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.mapred.JobConf;
-import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java Thu Jan 8 02:00:11 2015
@@ -56,7 +56,7 @@ public class LocalHiveSparkClient implem
private static final long serialVersionUID = 1L;
private static final String MR_JAR_PROPERTY = "tmpjars";
- protected static transient final Log LOG = LogFactory
+ protected static final transient Log LOG = LogFactory
.getLog(LocalHiveSparkClient.class);
private static final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
@@ -138,7 +138,7 @@ public class LocalHiveSparkClient implem
* At this point single SparkContext is used by more than one thread, so make this
* method synchronized.
*
- * TODO: This method can't remove a jar/resource from SparkContext. Looks like this is an
+ * This method can't remove a jar/resource from SparkContext. Looks like this is an
* issue we have to live with until multiple SparkContexts are supported in a single JVM.
*/
private synchronized void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java Thu Jan 8 02:00:11 2015
@@ -64,10 +64,10 @@ public class RemoteHiveSparkClient imple
private static final long serialVersionUID = 1L;
private static final String MR_JAR_PROPERTY = "tmpjars";
- protected static transient final Log LOG = LogFactory
+ protected static final transient Log LOG = LogFactory
.getLog(RemoteHiveSparkClient.class);
- private static transient final Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
+ private static final transient Splitter CSV_SPLITTER = Splitter.on(",").omitEmptyStrings();
private transient SparkClient remoteClient;
private transient SparkConf sparkConf;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Thu Jan 8 02:00:11 2015
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.log.PerfLogger;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -58,16 +59,16 @@ import java.util.List;
public class SparkMapRecordHandler extends SparkRecordHandler {
private static final String PLAN_KEY = "__MAP_PLAN__";
private MapOperator mo;
- public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
+ public static final Log LOG = LogFactory.getLog(SparkMapRecordHandler.class);
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
private ExecMapperContext execContext;
- public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
- isLogInfoEnabled = l4j.isInfoEnabled();
+ isLogInfoEnabled = LOG.isInfoEnabled();
ObjectCache cache = ObjectCacheFactory.getCache(job);
try {
@@ -90,7 +91,7 @@ public class SparkMapRecordHandler exten
// initialize map operator
mo.setChildren(job);
- l4j.info(mo.dump(0));
+ LOG.info(mo.dump(0));
// initialize map local work
localWork = mrwork.getMapRedLocalWork();
execContext.setLocalWork(localWork);
@@ -111,11 +112,11 @@ public class SparkMapRecordHandler exten
//The following code is for mapjoin
//initialize all the dummy ops
- l4j.info("Initializing dummy operator");
+ LOG.info("Initializing dummy operator");
List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
- for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
dummyOp.setExecContext(execContext);
- dummyOp.initialize(jc,null);
+ dummyOp.initialize(jc, null);
}
} catch (Throwable e) {
abort = true;
@@ -148,14 +149,14 @@ public class SparkMapRecordHandler exten
// Don't create a new object if we are already out of memory
throw (OutOfMemoryError) e;
} else {
- l4j.fatal(StringUtils.stringifyException(e));
+ LOG.fatal(StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
}
@Override
- public void processRow(Object key, Iterator values) throws IOException {
+ public <E> void processRow(Object key, Iterator<E> values) throws IOException {
throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler.");
}
@@ -163,7 +164,7 @@ public class SparkMapRecordHandler exten
public void close() {
// No row was processed
if (oc == null) {
- l4j.trace("Close called. no row processed by map.");
+ LOG.trace("Close called. no row processed by map.");
}
// check if there are IOExceptions
@@ -177,10 +178,10 @@ public class SparkMapRecordHandler exten
mo.close(abort);
//for close the local work
- if(localWork != null){
+ if (localWork != null) {
List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
- for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps) {
dummyOp.close(abort);
}
}
@@ -195,8 +196,8 @@ public class SparkMapRecordHandler exten
} catch (Exception e) {
if (!abort) {
// signal new failure to map-reduce
- l4j.error("Hit error while closing operators - failing tree");
- throw new RuntimeException("Hive Runtime Error while closing operators", e);
+ LOG.error("Hit error while closing operators - failing tree");
+ throw new IllegalStateException("Error while closing operators", e);
}
} finally {
MapredContext.close();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java Thu Jan 8 02:00:11 2015
@@ -18,24 +18,29 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.io.IOException;
+import java.util.Iterator;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.*;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import java.io.IOException;
-import java.util.Iterator;
+import com.google.common.base.Preconditions;
/**
- * Copied from MergeFileMapper
+ * Copied from MergeFileMapper.
*
* As MergeFileMapper is very similar to ExecMapper, this class is
* very similar to SparkMapRecordHandler
@@ -43,13 +48,14 @@ import java.util.Iterator;
public class SparkMergeFileRecordHandler extends SparkRecordHandler {
private static final String PLAN_KEY = "__MAP_PLAN__";
- private static final Log l4j = LogFactory.getLog(SparkMergeFileRecordHandler.class);
+ private static final Log LOG = LogFactory.getLog(SparkMergeFileRecordHandler.class);
private Operator<? extends OperatorDesc> op;
- private AbstractFileMergeOperator mergeOp;
+ private AbstractFileMergeOperator<? extends FileMergeDesc> mergeOp;
private Object[] row;
+ @SuppressWarnings("unchecked")
@Override
- public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
super.init(job, output, reporter);
ObjectCache cache = ObjectCacheFactory.getCache(job);
@@ -70,22 +76,22 @@ public class SparkMergeFileRecordHandler
String alias = mergeFileWork.getAliasToWork().keySet().iterator().next();
op = mergeFileWork.getAliasToWork().get(alias);
if (op instanceof AbstractFileMergeOperator) {
- mergeOp = (AbstractFileMergeOperator) op;
+ mergeOp = (AbstractFileMergeOperator<? extends FileMergeDesc>) op;
mergeOp.initializeOp(jc);
row = new Object[2];
abort = false;
} else {
abort = true;
- throw new RuntimeException(
- "Merge file work's top operator should be an" +
- " instance of AbstractFileMergeOperator");
+ throw new IllegalStateException(
+ "Merge file work's top operator should be an"
+ + " instance of AbstractFileMergeOperator");
}
} else {
abort = true;
- throw new RuntimeException("Map work should be a merge file work.");
+ throw new IllegalStateException("Map work should be a merge file work.");
}
- l4j.info(mergeOp.dump(0));
+ LOG.info(mergeOp.dump(0));
} catch (HiveException e) {
abort = true;
throw new RuntimeException(e);
@@ -105,14 +111,14 @@ public class SparkMergeFileRecordHandler
}
@Override
- public void processRow(Object key, Iterator values) throws IOException {
+ public <E> void processRow(Object key, Iterator<E> values) throws IOException {
throw new UnsupportedOperationException("Do not support this method in "
+ this.getClass().getSimpleName());
}
@Override
public void close() {
- l4j.info("Closing Merge Operator " + mergeOp.getName());
+ LOG.info("Closing Merge Operator " + mergeOp.getName());
try {
mergeOp.closeOp(abort);
} catch (HiveException e) {
@@ -121,7 +127,7 @@ public class SparkMergeFileRecordHandler
}
@Override
- public boolean getDone() {
+ public boolean getDone() {
return mergeOp.getDone();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java Thu Jan 8 02:00:11 2015
@@ -33,8 +33,9 @@ import org.apache.spark.api.java.JavaPai
import com.google.common.base.Preconditions;
+@SuppressWarnings("rawtypes")
public class SparkPlan {
- private final String CLASS_NAME = SparkPlan.class.getName();
+ private static final String CLASS_NAME = SparkPlan.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private final Set<SparkTran> rootTrans = new HashSet<SparkTran>();
@@ -43,7 +44,8 @@ public class SparkPlan {
private final Map<SparkTran, List<SparkTran>> invertedTransGraph = new HashMap<SparkTran, List<SparkTran>>();
private final Set<Integer> cachedRDDIds = new HashSet<Integer>();
- public JavaPairRDD<HiveKey, BytesWritable> generateGraph() throws IllegalStateException {
+ @SuppressWarnings("unchecked")
+ public JavaPairRDD<HiveKey, BytesWritable> generateGraph() {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_BUILD_RDD_GRAPH);
Map<SparkTran, JavaPairRDD<HiveKey, BytesWritable>> tranToOutputRDDMap
= new HashMap<SparkTran, JavaPairRDD<HiveKey, BytesWritable>>();
@@ -98,7 +100,7 @@ public class SparkPlan {
}
/**
- * This method returns a topologically sorted list of SparkTran
+ * This method returns a topologically sorted list of SparkTran.
*/
private List<SparkTran> getAllTrans() {
List<SparkTran> result = new LinkedList<SparkTran>();
@@ -135,7 +137,7 @@ public class SparkPlan {
* @param parent
* @param child
*/
- public void connect(SparkTran parent, SparkTran child) throws IllegalStateException {
+ public void connect(SparkTran parent, SparkTran child) {
if (getChildren(parent).contains(child)) {
throw new IllegalStateException("Connection already exists");
}
@@ -151,7 +153,7 @@ public class SparkPlan {
invertedTransGraph.get(child).add(parent);
}
- public List<SparkTran> getParents(SparkTran tran) throws IllegalStateException {
+ public List<SparkTran> getParents(SparkTran tran) {
if (!invertedTransGraph.containsKey(tran)) {
return new ArrayList<SparkTran>();
}
@@ -159,7 +161,7 @@ public class SparkPlan {
return invertedTransGraph.get(tran);
}
- public List<SparkTran> getChildren(SparkTran tran) throws IllegalStateException {
+ public List<SparkTran> getChildren(SparkTran tran) {
if (!transGraph.containsKey(tran)) {
return new ArrayList<SparkTran>();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Thu Jan 8 02:00:11 2015
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -53,9 +54,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-
+@SuppressWarnings("rawtypes")
public class SparkPlanGenerator {
- private final String CLASS_NAME = SparkPlanGenerator.class.getName();
+ private static final String CLASS_NAME = SparkPlanGenerator.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
@@ -131,8 +132,8 @@ public class SparkPlanGenerator {
sparkPlan.connect(workToTranMap.get(parentWork), result);
}
} else {
- throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, " +
- "but found " + work.getClass().getName());
+ throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, "
+ + "but found " + work.getClass().getName());
}
if (cloneToWork.containsKey(work)) {
@@ -142,7 +143,7 @@ public class SparkPlanGenerator {
return result;
}
- private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
+ private Class<?> getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
// MergeFileWork is sub-class of MapWork, we don't need to distinguish here
if (mWork.getInputformat() != null) {
HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT,
@@ -168,6 +169,7 @@ public class SparkPlanGenerator {
return inputFormatClass;
}
+ @SuppressWarnings("unchecked")
private MapInput generateMapInput(SparkPlan sparkPlan, MapWork mapWork)
throws Exception {
JobConf jobConf = cloneJobConf(mapWork);
@@ -209,11 +211,12 @@ public class SparkPlanGenerator {
reduceTran.setReduceFunction(reduceFunc);
return reduceTran;
} else {
- throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, " +
- "but found " + work.getClass().getName());
+ throw new IllegalStateException("AssertionError: expected either MapWork or ReduceWork, "
+ + "but found " + work.getClass().getName());
}
}
+ @SuppressWarnings({ "unchecked" })
private JobConf cloneJobConf(BaseWork work) throws Exception {
if (workToJobConf.containsKey(work)) {
return workToJobConf.get(work);
@@ -225,9 +228,8 @@ public class SparkPlanGenerator {
cloned.setPartitionerClass((Class<? extends Partitioner>)
(Class.forName(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER))));
} catch (ClassNotFoundException e) {
- String msg = "Could not find partitioner class: " + e.getMessage() +
- " which is specified by: " +
- HiveConf.ConfVars.HIVEPARTITIONER.varname;
+ String msg = "Could not find partitioner class: " + e.getMessage()
+ + " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname;
throw new IllegalArgumentException(msg, e);
}
if (work instanceof MapWork) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Thu Jan 8 02:00:11 2015
@@ -34,9 +34,9 @@ import java.util.Arrays;
import java.util.Iterator;
public abstract class SparkRecordHandler {
- protected final String CLASS_NAME = this.getClass().getName();
+ protected static final String CLASS_NAME = SparkRecordHandler.class.getName();
protected final PerfLogger perfLogger = PerfLogger.getPerfLogger();
- private final Log LOG = LogFactory.getLog(this.getClass());
+ private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class);
// used to log memory usage periodically
protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
@@ -48,14 +48,13 @@ public abstract class SparkRecordHandler
private long rowNumber = 0;
private long nextLogThreshold = 1;
- public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ public <K, V> void init(JobConf job, OutputCollector<K, V> output, Reporter reporter) throws Exception {
jc = job;
MapredContext.init(false, new JobConf(jc));
MapredContext.get().setReporter(reporter);
oc = output;
rp = reporter;
-// MapredContext.get().setReporter(reporter);
LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -78,7 +77,7 @@ public abstract class SparkRecordHandler
/**
* Process row with key and value collection.
*/
- public abstract void processRow(Object key, Iterator values) throws IOException;
+ public abstract <E> void processRow(Object key, Iterator<E> values) throws IOException;
/**
* Log processed row number and used memory info.
@@ -86,9 +85,9 @@ public abstract class SparkRecordHandler
protected void logMemoryInfo() {
rowNumber++;
if (rowNumber == nextLogThreshold) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
LOG.info("processing " + rowNumber
- + " rows: used memory = " + used_memory);
+ + " rows: used memory = " + usedMemory);
nextLogThreshold = getNextLogThreshold(rowNumber);
}
}
@@ -97,12 +96,12 @@ public abstract class SparkRecordHandler
public abstract boolean getDone();
/**
- * Log information to be logged at the end
+ * Log information to be logged at the end.
*/
protected void logCloseInfo() {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
LOG.info("processed " + rowNumber + " rows: used memory = "
- + used_memory);
+ + usedMemory);
}
private long getNextLogThreshold(long currentThreshold) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Thu Jan 8 02:00:11 2015
@@ -69,7 +69,7 @@ import org.apache.hadoop.util.StringUtil
* - Catch and handle errors during execution of the operators.
*
*/
-public class SparkReduceRecordHandler extends SparkRecordHandler{
+public class SparkReduceRecordHandler extends SparkRecordHandler {
private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class);
private static final String PLAN_KEY = "__REDUCE_PLAN__";
@@ -98,14 +98,15 @@ public class SparkReduceRecordHandler ex
private VectorizedRowBatch[] batches;
// number of columns pertaining to keys in a vectorized row batch
private int keysColumnOffset;
- private final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+ private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
private StructObjectInspector keyStructInspector;
private StructObjectInspector[] valueStructInspectors;
/* this is only used in the error code path */
private List<VectorExpressionWriter>[] valueStringWriters;
private MapredLocalWork localWork = null;
- public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ @SuppressWarnings("unchecked")
+ public void init(JobConf job, OutputCollector output, Reporter reporter) throws Exception {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_INIT_OPERATORS);
super.init(job, output, reporter);
@@ -240,7 +241,7 @@ public class SparkReduceRecordHandler ex
}
@Override
- public void processRow(Object key, Iterator values) throws IOException {
+ public <E> void processRow(Object key, Iterator<E> values) throws IOException {
if (reducer.getDone()) {
return;
}
@@ -305,7 +306,7 @@ public class SparkReduceRecordHandler ex
* @param values
* @return true if it is not done and can take more inputs
*/
- private boolean processKeyValues(Iterator values, byte tag) throws HiveException {
+ private <E> boolean processKeyValues(Iterator<E> values, byte tag) throws HiveException {
while (values.hasNext()) {
BytesWritable valueWritable = (BytesWritable) values.next();
try {
@@ -332,10 +333,10 @@ public class SparkReduceRecordHandler ex
try {
rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
} catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
+ rowString = "[Error getting row data with exception "
+ + StringUtils.stringifyException(e2) + " ]";
}
- throw new HiveException("Hive Runtime Error while processing row (tag="
+ throw new HiveException("Error while processing row (tag="
+ tag + ") " + rowString, e);
}
}
@@ -346,7 +347,7 @@ public class SparkReduceRecordHandler ex
* @param values
* @return true if it is not done and can take more inputs
*/
- private boolean processVectors(Iterator values, byte tag) throws HiveException {
+ private <E> boolean processVectors(Iterator<E> values, byte tag) throws HiveException {
VectorizedRowBatch batch = batches[tag];
batch.reset();
@@ -392,7 +393,7 @@ public class SparkReduceRecordHandler ex
rowString = "[Error getting row data with exception " + StringUtils.stringifyException(e2)
+ " ]";
}
- throw new HiveException("Hive Runtime Error while processing vector batch (tag=" + tag + ") "
+ throw new HiveException("Error while processing vector batch (tag=" + tag + ") "
+ rowString, e);
}
return true; // give me more
@@ -402,7 +403,7 @@ public class SparkReduceRecordHandler ex
try {
return inputValueDeserializer[tag].deserialize(valueWritable);
} catch (SerDeException e) {
- throw new HiveException("Hive Runtime Error: Unable to deserialize reduce input value (tag="
+ throw new HiveException("Error: Unable to deserialize reduce input value (tag="
+ tag + ") from "
+ Utilities.formatBinaryString(valueWritable.getBytes(), 0, valueWritable.getLength())
+ " with properties " + valueTableDesc[tag].getProperties(), e);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java Thu Jan 8 02:00:11 2015
@@ -67,7 +67,7 @@ public class SparkReporter implements Re
}
@Override
- public InputSplit getInputSplit() throws UnsupportedOperationException {
+ public InputSplit getInputSplit() {
throw new UnsupportedOperationException("do not support this method now.");
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Thu Jan 8 02:00:11 2015
@@ -28,7 +28,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -47,14 +46,13 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup;
import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
-import org.apache.hive.spark.counter.SparkCounters;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager;
import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef;
import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -70,23 +68,20 @@ import org.apache.hadoop.hive.ql.plan.St
import org.apache.hadoop.hive.ql.plan.UnionWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.spark.counter.SparkCounters;
import com.google.common.collect.Lists;
public class SparkTask extends Task<SparkWork> {
- private final String CLASS_NAME = SparkTask.class.getName();
+ private static final String CLASS_NAME = SparkTask.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
private static final long serialVersionUID = 1L;
- private transient JobConf job;
- private transient ContentSummary inputSummary;
private SparkCounters sparkCounters;
@Override
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
- job = new JobConf(conf, SparkTask.class);
}
@Override
@@ -133,7 +128,7 @@ public class SparkTask extends Task<Spar
rc = close(rc);
try {
sparkSessionManager.returnSession(sparkSession);
- } catch(HiveException ex) {
+ } catch (HiveException ex) {
LOG.error("Failed to return the session to SessionManager", ex);
}
}
@@ -155,7 +150,7 @@ public class SparkTask extends Task<Spar
}
/**
- * close will move the temp files into the right place for the fetch
+ * Close will move the temp files into the right place for the fetch
* task. If the job has failed it will clean up the files.
*/
private int close(int rc) {
@@ -211,7 +206,7 @@ public class SparkTask extends Task<Spar
}
}
if (candidate) {
- result.add((MapWork)w);
+ result.add((MapWork) w);
}
}
}
@@ -316,11 +311,11 @@ public class SparkTask extends Task<Spar
}
for (Task<? extends Serializable> task : childTasks) {
if (task instanceof StatsTask) {
- return (StatsTask)task;
+ return (StatsTask) task;
} else {
Task<? extends Serializable> childTask = getStatsTaskInChildTasks(task);
if (childTask instanceof StatsTask) {
- return (StatsTask)childTask;
+ return (StatsTask) childTask;
} else {
continue;
}
@@ -383,7 +378,7 @@ public class SparkTask extends Task<Spar
}
SparkWork sparkWork = this.getWork();
for (BaseWork work : sparkWork.getAllWork()) {
- for (Operator operator : work.getAllOperators()) {
+ for (Operator<? extends OperatorDesc> operator : work.getAllOperators()) {
if (operator instanceof FileSinkOperator) {
for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) {
hiveCounters.add(counter.toString());
@@ -392,11 +387,11 @@ public class SparkTask extends Task<Spar
for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) {
hiveCounters.add(counter.toString());
}
- }else if (operator instanceof ScriptOperator) {
+ } else if (operator instanceof ScriptOperator) {
for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) {
hiveCounters.add(counter.toString());
}
- }else if (operator instanceof JoinOperator) {
+ } else if (operator instanceof JoinOperator) {
for (JoinOperator.SkewkeyTableCounter counter : JoinOperator.SkewkeyTableCounter.values()) {
hiveCounters.add(counter.toString());
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Thu Jan 8 02:00:11 2015
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.io.WritableComparable;
import org.apache.spark.api.java.JavaPairRDD;
+@SuppressWarnings("rawtypes")
public interface SparkTran<KI extends WritableComparable, VI, KO extends WritableComparable, VO> {
JavaPairRDD<KO, VO> transform(
JavaPairRDD<KI, VI> input);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Thu Jan 8 02:00:11 2015
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.io.BytesWritable;
/**
- * Contains utilities methods used as part of Spark tasks
+ * Contains utilities methods used as part of Spark tasks.
*/
public class SparkUtilities {
@@ -76,7 +76,7 @@ public class SparkUtilities {
SparkSession sparkSession = SessionState.get().getSparkSession();
// Spark configurations are updated close the existing session
- if(conf.getSparkConfigUpdated()){
+ if (conf.getSparkConfigUpdated()) {
sparkSessionManager.closeSession(sparkSession);
sparkSession = null;
conf.setSparkConfigUpdated(false);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSession.java Thu Jan 8 02:00:11 2015
@@ -28,15 +28,17 @@ import scala.Tuple2;
public interface SparkSession {
/**
* Initializes a Spark session for DAG execution.
+ * @param conf Hive configuration.
*/
- public void open(HiveConf conf) throws HiveException;
+ void open(HiveConf conf) throws HiveException;
/**
- * Submit given <i>sparkWork</i> to SparkClient
+ * Submit given <i>sparkWork</i> to SparkClient.
* @param driverContext
* @param sparkWork
+ * @return SparkJobRef
*/
- public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
+ SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception;
/**
* Get Spark shuffle memory per task, and total number of cores. This
@@ -45,25 +47,25 @@ public interface SparkSession {
* @return a tuple, the first element is the shuffle memory per task in bytes,
* the second element is the number of total cores usable by the client
*/
- public Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
+ Tuple2<Long, Integer> getMemoryAndCores() throws Exception;
/**
- * Is the session open and ready to submit jobs?
+ * @return true if the session is open and ready to submit jobs.
*/
- public boolean isOpen();
+ boolean isOpen();
/**
- * Return configuration.
+ * @return configuration.
*/
- public HiveConf getConf();
+ HiveConf getConf();
/**
- * Return session id.
+ * @return session id.
*/
- public String getSessionId();
+ String getSessionId();
/**
- * Close session and release resources
+ * Close session and release resources.
*/
- public void close();
+ void close();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManager.java Thu Jan 8 02:00:11 2015
@@ -30,7 +30,7 @@ public interface SparkSessionManager {
*
* @param hiveConf
*/
- public void setup(HiveConf hiveConf) throws HiveException;
+ void setup(HiveConf hiveConf) throws HiveException;
/**
* Get a valid SparkSession. First try to check if existing session is reusable
@@ -40,9 +40,9 @@ public interface SparkSessionManager {
* @param existingSession Existing session (can be null)
* @param conf
* @param doOpen Should the session be opened before returning?
- * @return
+ * @return SparkSession
*/
- public SparkSession getSession(SparkSession existingSession, HiveConf conf,
+ SparkSession getSession(SparkSession existingSession, HiveConf conf,
boolean doOpen) throws HiveException;
/**
@@ -50,16 +50,16 @@ public interface SparkSessionManager {
* still holds references to session and may want to reuse it in future.
* When client wants to reuse the session, it should pass the it <i>getSession</i> method.
*/
- public void returnSession(SparkSession sparkSession) throws HiveException;
+ void returnSession(SparkSession sparkSession) throws HiveException;
/**
* Close the given session and return it to pool. This is used when the client
* no longer needs a SparkSession.
*/
- public void closeSession(SparkSession sparkSession) throws HiveException;
+ void closeSession(SparkSession sparkSession) throws HiveException;
/**
* Shutdown the session manager. Also closing up SparkSessions in pool.
*/
- public void shutdown();
+ void shutdown();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionManagerImpl.java Thu Jan 8 02:00:11 2015
@@ -64,7 +64,7 @@ public class SparkSessionManagerImpl imp
});
}
- public synchronized static SparkSessionManagerImpl getInstance()
+ public static synchronized SparkSessionManagerImpl getInstance()
throws HiveException {
if (instance == null) {
instance = new SparkSessionManagerImpl();
@@ -142,12 +142,12 @@ public class SparkSessionManagerImpl imp
UserGroupInformation newUgi = Utils.getUGI();
String newUserName = newUgi.getShortUserName();
- // TODOD this we need to store the session username somewhere else as getUGIForConf never used the conf
+ // TODO this we need to store the session username somewhere else as getUGIForConf never used the conf
UserGroupInformation ugiInSession = Utils.getUGI();
String userNameInSession = ugiInSession.getShortUserName();
return newUserName.equals(userNameInSession);
- } catch(Exception ex) {
+ } catch (Exception ex) {
throw new HiveException("Failed to get user info from HiveConf.", ex);
}
}
@@ -175,7 +175,7 @@ public class SparkSessionManagerImpl imp
public void shutdown() {
LOG.info("Closing the session manager.");
if (createdSessions != null) {
- synchronized(createdSessions) {
+ synchronized (createdSessions) {
Iterator<SparkSession> it = createdSessions.iterator();
while (it.hasNext()) {
SparkSession session = it.next();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java Thu Jan 8 02:00:11 2015
@@ -74,8 +74,8 @@ public class SparkJobMonitor {
if (LOG.isDebugEnabled()) {
console.printInfo("state = " + state);
}
- if (state != null && state != JobExecutionStatus.UNKNOWN &&
- (state != lastState || state == JobExecutionStatus.RUNNING)) {
+ if (state != null && state != JobExecutionStatus.UNKNOWN
+ && (state != lastState || state == JobExecutionStatus.RUNNING)) {
lastState = state;
Map<String, SparkStageProgress> progressMap = sparkJobStatus.getSparkStageProgress();
@@ -84,19 +84,19 @@ public class SparkJobMonitor {
if (!running) {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING);
// print job stages.
- console.printInfo("\nQuery Hive on Spark job[" +
- sparkJobStatus.getJobId() + "] stages:");
+ console.printInfo("\nQuery Hive on Spark job["
+ + sparkJobStatus.getJobId() + "] stages:");
for (int stageId : sparkJobStatus.getStageIds()) {
console.printInfo(Integer.toString(stageId));
}
- console.printInfo("\nStatus: Running (Hive on Spark job[" +
- sparkJobStatus.getJobId() + "])");
+ console.printInfo("\nStatus: Running (Hive on Spark job["
+ + sparkJobStatus.getJobId() + "])");
startTime = System.currentTimeMillis();
running = true;
- console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " +
- "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
+ console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: "
+ + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]");
}
@@ -110,8 +110,8 @@ public class SparkJobMonitor {
console.printInfo("Status: Finished successfully within a check interval.");
} else {
double duration = (System.currentTimeMillis() - startTime) / 1000.0;
- console.printInfo("Status: Finished successfully in " +
- String.format("%.2f seconds", duration));
+ console.printInfo("Status: Finished successfully in "
+ + String.format("%.2f seconds", duration));
}
running = false;
done = true;
@@ -122,6 +122,12 @@ public class SparkJobMonitor {
done = true;
rc = 2;
break;
+ case UNKNOWN:
+ console.printError("Status: Unknown");
+ running = false;
+ done = true;
+ rc = 2;
+ break;
}
}
if (!done) {
@@ -231,11 +237,7 @@ public class SparkJobMonitor {
}
if (progressMap.isEmpty()) {
- if (lastProgressMap.isEmpty()) {
- return true;
- } else {
- return false;
- }
+ return lastProgressMap.isEmpty();
} else {
if (lastProgressMap.isEmpty()) {
return false;
@@ -244,8 +246,8 @@ public class SparkJobMonitor {
return false;
}
for (String key : progressMap.keySet()) {
- if (!lastProgressMap.containsKey(key) ||
- !progressMap.get(key).equals(lastProgressMap.get(key))) {
+ if (!lastProgressMap.containsKey(key)
+ || !progressMap.get(key).equals(lastProgressMap.get(key))) {
return false;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java Thu Jan 8 02:00:11 2015
@@ -23,7 +23,7 @@ public class SparkJobRef {
private SparkJobStatus sparkJobStatus;
- public SparkJobRef() {}
+ public SparkJobRef() { }
public SparkJobRef(String jobId) {
this.jobId = jobId;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java Thu Jan 8 02:00:11 2015
@@ -28,17 +28,17 @@ import java.util.Map;
*/
public interface SparkJobStatus {
- public int getJobId();
+ int getJobId();
- public JobExecutionStatus getState();
+ JobExecutionStatus getState();
- public int[] getStageIds();
+ int[] getStageIds();
- public Map<String, SparkStageProgress> getSparkStageProgress();
+ Map<String, SparkStageProgress> getSparkStageProgress();
- public SparkCounters getCounter();
+ SparkCounters getCounter();
- public SparkStatistics getSparkStatistics();
+ SparkStatistics getSparkStatistics();
- public void cleanup();
+ void cleanup();
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java Thu Jan 8 02:00:11 2015
@@ -23,10 +23,6 @@ public class SparkStageProgress {
private int succeededTaskCount;
private int runningTaskCount;
private int failedTaskCount;
- // TODO: remove the following two metrics as they're not available in current spark API,
- // we can add them back once spark provides it
-// private int killedTaskCount;
-// private long cumulativeTime;
public SparkStageProgress(
int totalTaskCount,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobMetricsListener.java Thu Jan 8 02:00:11 2015
@@ -21,13 +21,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState;
import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.scheduler.JobSucceeded;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
@@ -44,9 +40,12 @@ import org.apache.spark.scheduler.SparkL
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
public class JobMetricsListener implements SparkListener {
- private final static Log LOG = LogFactory.getLog(JobMetricsListener.class);
+ private static final Log LOG = LogFactory.getLog(JobMetricsListener.class);
private final Map<Integer, int[]> jobIdToStageId = Maps.newHashMap();
private final Map<Integer, Integer> stageIdToJobId = Maps.newHashMap();
@@ -100,7 +99,7 @@ public class JobMetricsListener implemen
int jobId = jobStart.jobId();
int size = jobStart.stageIds().size();
int[] intStageIds = new int[size];
- for(int i=0; i< size; i++) {
+ for (int i = 0; i < size; i++) {
Integer stageId = (Integer) jobStart.stageIds().apply(i);
intStageIds[i] = stageId;
stageIdToJobId.put(stageId, jobId);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java Thu Jan 8 02:00:11 2015
@@ -99,8 +99,8 @@ public class LocalSparkJobStatus impleme
int totalTaskCount = sparkStageInfo.numTasks();
SparkStageProgress sparkStageProgress = new SparkStageProgress(
totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
- stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
- sparkStageInfo.currentAttemptId(), sparkStageProgress);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
+ + sparkStageInfo.currentAttemptId(), sparkStageProgress);
}
}
return stageProgresses;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java Thu Jan 8 02:00:11 2015
@@ -94,8 +94,8 @@ public class RemoteSparkJobStatus implem
int totalTaskCount = sparkStageInfo.numTasks();
SparkStageProgress sparkStageProgress = new SparkStageProgress(
totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount);
- stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" +
- sparkStageInfo.currentAttemptId(), sparkStageProgress);
+ stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_"
+ + sparkStageInfo.currentAttemptId(), sparkStageProgress);
}
}
return stageProgresses;
@@ -132,8 +132,8 @@ public class RemoteSparkJobStatus implem
}
private SparkJobInfo getSparkJobInfo() {
- Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ?
- jobHandle.getSparkJobIds().get(0) : null;
+ Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1
+ ? jobHandle.getSparkJobIds().get(0) : null;
if (sparkJobId == null) {
long duration = TimeUnit.MILLISECONDS.convert(
System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java Thu Jan 8 02:00:11 2015
@@ -18,17 +18,16 @@
package org.apache.hadoop.hive.ql.lib;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-
import java.util.Stack;
-import java.util.regex.Matcher;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
* Rule that matches a particular type of node.
*/
public class TypeRule implements Rule {
- private Class nodeClass;
+ private Class<?> nodeClass;
public TypeRule(Class<?> nodeClass) {
this.nodeClass = nodeClass;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1650201&r1=1650200&r2=1650201&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Thu Jan 8 02:00:11 2015
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.optimizer.physical;
import com.google.common.base.Preconditions;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -79,6 +80,7 @@ public class GenSparkSkewJoinProcessor {
// prevent instantiation
}
+ @SuppressWarnings("unchecked")
public static void processSkewJoin(JoinOperator joinOp, Task<? extends Serializable> currTask,
ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException {
@@ -138,7 +140,7 @@ public class GenSparkSkewJoinProcessor {
// used for create mapJoinDesc, should be in order
List<TableDesc> newJoinValueTblDesc = new ArrayList<TableDesc>();
- for (Byte tag : tags) {
+ for (int i = 0; i < tags.length; i++) {
newJoinValueTblDesc.add(null);
}
@@ -231,14 +233,14 @@ public class GenSparkSkewJoinProcessor {
for (int k = 0; k < tags.length; k++) {
Operator<? extends OperatorDesc> ts =
GenMapRedUtils.createTemporaryTableScanOperator(rowSchemaList.get((byte) k));
- ((TableScanOperator)ts).setTableDesc(tableDescList.get((byte)k));
+ ((TableScanOperator) ts).setTableDesc(tableDescList.get((byte) k));
parentOps[k] = ts;
}
// create the MapJoinOperator
- String dumpFilePrefix = "mapfile"+ PlanUtils.getCountForMapJoinDumpFilePrefix();
+ String dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
MapJoinDesc mapJoinDescriptor = new MapJoinDesc(newJoinKeys, keyTblDesc,
- newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
+ newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
.getOutputColumnNames(), i, joinDescriptor.getConds(),
joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix);
mapJoinDescriptor.setTagOrder(tags);
@@ -307,7 +309,7 @@ public class GenSparkSkewJoinProcessor {
for (BaseWork work : sparkWork.getRoots()) {
Preconditions.checkArgument(work instanceof MapWork,
"All root work should be MapWork, but got " + work.getClass().getSimpleName());
- if(work != bigMapWork) {
+ if (work != bigMapWork) {
sparkWork.connect(work, bigMapWork,
new SparkEdgeProperty(SparkEdgeProperty.SHUFFLE_NONE));
}
@@ -351,8 +353,9 @@ public class GenSparkSkewJoinProcessor {
}
/**
- * Insert SparkHashTableSink and HashTableDummy between small dir TS and MJ
+ * Insert SparkHashTableSink and HashTableDummy between small dir TS and MJ.
*/
+ @SuppressWarnings("unchecked")
private static void insertSHTS(byte tag, TableScanOperator tableScan, MapWork bigMapWork) {
Preconditions.checkArgument(tableScan.getChildOperators().size() == 1
&& tableScan.getChildOperators().get(0) instanceof MapJoinOperator);
@@ -419,7 +422,7 @@ public class GenSparkSkewJoinProcessor {
public static boolean supportRuntimeSkewJoin(JoinOperator joinOp,
Task<? extends Serializable> currTask, HiveConf hiveConf) {
List<Task<? extends Serializable>> children = currTask.getChildTasks();
- return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) &&
- (children == null || children.size() <= 1);
+ return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp)
+ && (children == null || children.size() <= 1);
}
}