You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2015/10/13 01:21:46 UTC
[2/2] hive git commit: HIVE-12065 : FS stats collection may generate
incorrect stats for multi-insert query (Ashutosh Chauhan via Pengcheng Xiong)
HIVE-12065 : FS stats collection may generate incorrect stats for multi-insert query (Ashutosh Chauhan via Pengcheng Xiong)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9b4826e7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9b4826e7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9b4826e7
Branch: refs/heads/master
Commit: 9b4826e765423c4766dbac052127546213ac0752
Parents: b97fdc0
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Wed Oct 7 17:45:50 2015 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Mon Oct 12 16:21:05 2015 -0700
----------------------------------------------------------------------
.../hive/ql/stats/DummyStatsAggregator.java | 12 ++--
.../hive/ql/stats/DummyStatsPublisher.java | 15 +++--
.../ql/stats/KeyVerifyingStatsAggregator.java | 10 +--
.../hadoop/hive/ql/exec/FileSinkOperator.java | 8 ++-
.../apache/hadoop/hive/ql/exec/StatsTask.java | 31 ++++++---
.../hadoop/hive/ql/exec/TableScanOperator.java | 7 +-
.../apache/hadoop/hive/ql/exec/Utilities.java | 31 ++++++++-
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 14 +++-
.../hive/ql/exec/spark/SparkPlanGenerator.java | 16 +++--
.../hadoop/hive/ql/exec/tez/DagUtils.java | 5 +-
.../hive/ql/index/AggregateIndexHandler.java | 1 -
.../hive/ql/index/TableBasedIndexHandler.java | 7 --
.../ql/index/bitmap/BitmapIndexHandler.java | 1 -
.../ql/index/compact/CompactIndexHandler.java | 1 -
.../ql/io/rcfile/stats/PartialScanMapper.java | 7 +-
.../ql/io/rcfile/stats/PartialScanTask.java | 11 +++-
.../ql/io/rcfile/stats/PartialScanWork.java | 14 ++++
.../hive/ql/optimizer/GenMRTableScan1.java | 3 +
.../hive/ql/optimizer/GenMapRedUtils.java | 2 +-
.../hive/ql/parse/ProcessAnalyzeTable.java | 4 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +--
.../parse/spark/SparkProcessAnalyzeTable.java | 2 +
.../hadoop/hive/ql/plan/FileSinkDesc.java | 16 ++++-
.../apache/hadoop/hive/ql/plan/StatsWork.java | 15 +++--
.../hadoop/hive/ql/plan/TableScanDesc.java | 12 +++-
.../hive/ql/stats/CounterStatsAggregator.java | 8 +--
.../ql/stats/CounterStatsAggregatorSpark.java | 6 +-
.../ql/stats/CounterStatsAggregatorTez.java | 10 ++-
.../hive/ql/stats/CounterStatsPublisher.java | 7 +-
.../hadoop/hive/ql/stats/StatsAggregator.java | 7 +-
.../hive/ql/stats/StatsCollectionContext.java | 63 ++++++++++++++++++
.../hadoop/hive/ql/stats/StatsPublisher.java | 8 +--
.../hive/ql/stats/fs/FSStatsAggregator.java | 23 ++++---
.../hive/ql/stats/fs/FSStatsPublisher.java | 32 +++++----
.../hive/ql/stats/jdbc/JDBCStatsAggregator.java | 18 +++---
.../hive/ql/stats/jdbc/JDBCStatsPublisher.java | 22 ++++---
.../hive/ql/exec/TestFileSinkOperator.java | 13 ++--
.../ql/exec/TestStatsPublisherEnhanced.java | 61 ++++++++++--------
.../infer_bucket_sort_multi_insert.q | 1 +
.../test/queries/clientpositive/multi_insert.q | 2 +-
.../queries/clientpositive/multi_insert_gby2.q | 2 +-
.../queries/clientpositive/multi_insert_gby3.q | 2 +-
.../clientpositive/multi_insert_lateral_view.q | 1 +
.../queries/clientpositive/multi_insert_mixed.q | 2 +-
...multi_insert_move_tasks_share_dependencies.q | 2 +-
.../clientpositive/multi_insert_union_src.q | 2 +-
.../spark/column_access_stats.q.out | 46 ++++++-------
.../test/results/clientpositive/spark/pcr.q.out | 16 ++---
.../clientpositive/spark/ppd_join5.q.out | 58 ++++++++---------
.../clientpositive/spark/smb_mapjoin_12.q.out | 6 +-
.../clientpositive/spark/smb_mapjoin_13.q.out | 36 +++++------
.../clientpositive/spark/smb_mapjoin_15.q.out | 12 ++--
.../clientpositive/spark/smb_mapjoin_16.q.out | 2 +-
.../results/clientpositive/spark/union34.q.out | 68 ++++++++++----------
54 files changed, 486 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java
index 327eabc..eb3f6eb 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsAggregator.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hive.ql.stats;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Task;
/**
* An test implementation for StatsAggregator.
@@ -34,8 +32,9 @@ public class DummyStatsAggregator implements StatsAggregator {
// This is a test. The parameter hive.test.dummystats.aggregator's value
// denotes the method which needs to throw an error.
- public boolean connect(Configuration hconf, Task sourceTask) {
- errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR);
+ @Override
+ public boolean connect(StatsCollectionContext scc) {
+ errorMethod = HiveConf.getVar(scc.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATAGGR);
if (errorMethod.equalsIgnoreCase("connect")) {
return false;
}
@@ -43,17 +42,20 @@ public class DummyStatsAggregator implements StatsAggregator {
return true;
}
+ @Override
public String aggregateStats(String keyPrefix, String statType) {
return null;
}
- public boolean closeConnection() {
+ @Override
+ public boolean closeConnection(StatsCollectionContext scc) {
if (errorMethod.equalsIgnoreCase("closeConnection")) {
return false;
}
return true;
}
+ @Override
public boolean cleanUp(String keyPrefix) {
if (errorMethod.equalsIgnoreCase("cleanUp")) {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java
index 1f6e80f..9f1fdb4 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/DummyStatsPublisher.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.stats;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
/**
@@ -36,8 +35,9 @@ public class DummyStatsPublisher implements StatsPublisher {
// This is a test. The parameter hive.test.dummystats.publisher's value
// denotes the method which needs to throw an error.
- public boolean init(Configuration hconf) {
- errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
+ @Override
+ public boolean init(StatsCollectionContext context) {
+ errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
if (errorMethod.equalsIgnoreCase("init")) {
return false;
}
@@ -45,8 +45,9 @@ public class DummyStatsPublisher implements StatsPublisher {
return true;
}
- public boolean connect(Configuration hconf) {
- errorMethod = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
+ @Override
+ public boolean connect(StatsCollectionContext context) {
+ errorMethod = HiveConf.getVar(context.getHiveConf(), HiveConf.ConfVars.HIVETESTMODEDUMMYSTATPUB);
if (errorMethod.equalsIgnoreCase("connect")) {
return false;
}
@@ -54,6 +55,7 @@ public class DummyStatsPublisher implements StatsPublisher {
return true;
}
+ @Override
public boolean publishStat(String fileID, Map<String, String> stats) {
if (errorMethod.equalsIgnoreCase("publishStat")) {
return false;
@@ -61,7 +63,8 @@ public class DummyStatsPublisher implements StatsPublisher {
return true;
}
- public boolean closeConnection() {
+ @Override
+ public boolean closeConnection(StatsCollectionContext context) {
if (errorMethod.equalsIgnoreCase("closeConnection")) {
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java
index cb0b584..4e00316 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/stats/KeyVerifyingStatsAggregator.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.stats;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -30,10 +28,12 @@ import org.apache.hadoop.hive.ql.session.SessionState;
public class KeyVerifyingStatsAggregator implements StatsAggregator {
- public boolean connect(Configuration hconf, Task sourceTask) {
+ @Override
+ public boolean connect(StatsCollectionContext scc) {
return true;
}
+ @Override
public String aggregateStats(String keyPrefix, String statType) {
SessionState ss = SessionState.get();
// Have to use the length instead of the actual prefix because the prefix is location dependent
@@ -43,10 +43,12 @@ public class KeyVerifyingStatsAggregator implements StatsAggregator {
return null;
}
- public boolean closeConnection() {
+ @Override
+ public boolean closeConnection(StatsCollectionContext scc) {
return true;
}
+ @Override
public boolean cleanUp(String keyPrefix) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index e247673..864c3d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -61,9 +61,9 @@ import org.apache.hadoop.hive.ql.plan.FileSinkDesc.DPSortState;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -1137,7 +1137,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
return;
}
- if (!statsPublisher.connect(hconf)) {
+ StatsCollectionContext sContext = new StatsCollectionContext(hconf);
+ sContext.setStatsTmpDir(conf.getStatsTmpDir());
+ if (!statsPublisher.connect(sContext)) {
// just return, stats gathering should not block the main query
LOG.error("StatsPublishing error: cannot connect to database");
if (isStatsReliable) {
@@ -1204,7 +1206,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
}
- if (!statsPublisher.closeConnection()) {
+ if (!statsPublisher.closeConnection(sContext)) {
// The original exception is lost.
// Not changing the interface to maintain backward compatibility
if (isStatsReliable) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 41ece04..9775645 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.util.StringUtils;
@@ -134,13 +135,14 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
StatsAggregator statsAggregator = null;
int ret = 0;
-
+ StatsCollectionContext scc = null;
try {
// Stats setup:
Warehouse wh = new Warehouse(conf);
if (!getWork().getNoStatsAggregator() && !getWork().isNoScanAnalyzeCommand()) {
try {
- statsAggregator = createStatsAggregator(conf);
+ scc = getContext();
+ statsAggregator = createStatsAggregator(scc);
} catch (HiveException e) {
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw e;
@@ -241,7 +243,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
}
} finally {
if (statsAggregator != null) {
- statsAggregator.closeConnection();
+ statsAggregator.closeConnection(scc);
}
}
// The return value of 0 indicates success,
@@ -268,7 +270,7 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
return prefix.toString();
}
- private StatsAggregator createStatsAggregator(HiveConf conf) throws HiveException {
+ private StatsAggregator createStatsAggregator(StatsCollectionContext scc) throws HiveException {
String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
StatsFactory factory = StatsFactory.newFactory(statsImpl, conf);
if (factory == null) {
@@ -277,21 +279,30 @@ public class StatsTask extends Task<StatsWork> implements Serializable {
// initialize stats publishing table for noscan which has only stats task
// the rest of MR task following stats task initializes it in ExecDriver.java
StatsPublisher statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (!statsPublisher.init(scc)) { // creating stats table if not exists
throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
}
- Task sourceTask = getWork().getSourceTask();
- if (sourceTask == null) {
- throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
- }
+
// manufacture a StatsAggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
- if (!statsAggregator.connect(conf, sourceTask)) {
+ if (!statsAggregator.connect(scc)) {
throw new HiveException(ErrorMsg.STATSAGGREGATOR_CONNECTION_ERROR.getErrorCodedMsg(statsImpl));
}
return statsAggregator;
}
+ private StatsCollectionContext getContext() throws HiveException {
+
+ StatsCollectionContext scc = new StatsCollectionContext(conf);
+ Task sourceTask = getWork().getSourceTask();
+ if (sourceTask == null) {
+ throw new HiveException(ErrorMsg.STATSAGGREGATOR_SOURCETASK_NULL.getErrorCodedMsg());
+ }
+ scc.setTask(sourceTask);
+ scc.setStatsTmpDir(this.getWork().getStatsTmpDir());
+ return scc;
+ }
+
private boolean existStats(Map<String, String> parameters) {
return parameters.containsKey(StatsSetupConst.ROW_COUNT)
|| parameters.containsKey(StatsSetupConst.NUM_FILES)
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
index cbf02e9..22f7520 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
@@ -282,7 +283,9 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
// Initializing a stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
- if (!statsPublisher.connect(jc)) {
+ StatsCollectionContext sc = new StatsCollectionContext(jc);
+ sc.setStatsTmpDir(conf.getTmpStatsDir());
+ if (!statsPublisher.connect(sc)) {
// just return, stats gathering should not block the main query.
if (isLogInfoEnabled) {
LOG.info("StatsPublishing error: cannot connect to database.");
@@ -318,7 +321,7 @@ public class TableScanOperator extends Operator<TableScanDesc> implements
LOG.info("publishing : " + key + " : " + statsToPublish.toString());
}
}
- if (!statsPublisher.closeConnection()) {
+ if (!statsPublisher.closeConnection(sc)) {
if (isStatsReliable) {
throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 5b21af9..b1ab1b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import java.beans.DefaultPersistenceDelegate;
import java.beans.Encoder;
import java.beans.ExceptionListener;
@@ -102,6 +100,7 @@ import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -160,6 +159,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -3933,4 +3933,31 @@ public final class Utilities {
}
}
+ public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) {
+
+ List<String> statsTmpDirs = new ArrayList<>();
+ if (!StatsSetupConst.StatDB.fs.name().equalsIgnoreCase(HiveConf.getVar(conf, ConfVars.HIVESTATSDBCLASS))) {
+ // no-op for non-fs stats collection
+ return statsTmpDirs;
+ }
+ // if its auto-stats gather for inserts or CTAS, stats dir will be in FileSink
+ Set<Operator<? extends OperatorDesc>> ops = work.getAllLeafOperators();
+ if (work instanceof MapWork) {
+ // if its an anlayze statement, stats dir will be in TableScan
+ ops.addAll(work.getAllRootOperators());
+ }
+ for (Operator<? extends OperatorDesc> op : ops) {
+ OperatorDesc desc = op.getConf();
+ String statsTmpDir = null;
+ if (desc instanceof FileSinkDesc) {
+ statsTmpDir = ((FileSinkDesc)desc).getStatsTmpDir();
+ } else if (desc instanceof TableScanDesc) {
+ statsTmpDir = ((TableScanDesc) desc).getTmpStatsDir();
+ }
+ if (statsTmpDir != null && !statsTmpDir.isEmpty()) {
+ statsTmpDirs.add(statsTmpDir);
+ }
+ }
+ return statsTmpDirs;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index d9225a9..b799a17 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -75,6 +76,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -243,7 +245,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
try {
String partitioner = HiveConf.getVar(job, ConfVars.HIVEPARTITIONER);
- job.setPartitionerClass((Class<? extends Partitioner>) JavaUtils.loadClass(partitioner));
+ job.setPartitionerClass(JavaUtils.loadClass(partitioner));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
@@ -289,7 +291,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
LOG.info("Using " + inpFormat);
try {
- job.setInputFormat((Class<? extends InputFormat>) JavaUtils.loadClass(inpFormat));
+ job.setInputFormat(JavaUtils.loadClass(inpFormat));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
@@ -408,7 +410,13 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
StatsFactory factory = StatsFactory.newFactory(job);
if (factory != null) {
statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(job)) { // creating stats table if not exists
+ List<String> statsTmpDir = Utilities.getStatsTmpDirs(mWork, job);
+ if (rWork != null) {
+ statsTmpDir.addAll(Utilities.getStatsTmpDirs(rWork, job));
+ }
+ StatsCollectionContext sc = new StatsCollectionContext(job);
+ sc.setStatsTmpDirs(statsTmpDir);
+ if (!statsPublisher.init(sc)) { // creating stats table if not exists
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 4c3ee4b..51e66ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.Writable;
@@ -65,11 +66,11 @@ public class SparkPlanGenerator {
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
- private JavaSparkContext sc;
+ private final JavaSparkContext sc;
private final JobConf jobConf;
- private Context context;
- private Path scratchDir;
- private SparkReporter sparkReporter;
+ private final Context context;
+ private final Path scratchDir;
+ private final SparkReporter sparkReporter;
private Map<BaseWork, BaseWork> cloneToWork;
private final Map<BaseWork, SparkTran> workToTranMap;
private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
@@ -270,8 +271,7 @@ public class SparkPlanGenerator {
// Make sure we'll use a different plan path from the original one
HiveConf.setVar(cloned, HiveConf.ConfVars.PLAN, "");
try {
- cloned.setPartitionerClass((Class<? extends Partitioner>)
- JavaUtils.loadClass(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER)));
+ cloned.setPartitionerClass(JavaUtils.loadClass(HiveConf.getVar(cloned, HiveConf.ConfVars.HIVEPARTITIONER)));
} catch (ClassNotFoundException e) {
String msg = "Could not find partitioner class: " + e.getMessage()
+ " which is specified by: " + HiveConf.ConfVars.HIVEPARTITIONER.varname;
@@ -315,7 +315,9 @@ public class SparkPlanGenerator {
StatsFactory factory = StatsFactory.newFactory(jobConf);
if (factory != null) {
statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(jobConf)) { // creating stats table if not exists
+ StatsCollectionContext sc = new StatsCollectionContext(jobConf);
+ sc.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, jobConf));
+ if (!statsPublisher.init(sc)) { // creating stats table if not exists
if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw new HiveException(
ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 19da1c3..bf950cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.Utils;
@@ -1094,8 +1095,10 @@ public class DagUtils {
StatsPublisher statsPublisher;
StatsFactory factory = StatsFactory.newFactory(conf);
if (factory != null) {
+ StatsCollectionContext sCntxt = new StatsCollectionContext(conf);
+ sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, conf));
statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (!statsPublisher.init(sCntxt)) { // creating stats table if not exists
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
index e67996d..68709b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/AggregateIndexHandler.java
@@ -153,7 +153,6 @@ public class AggregateIndexHandler extends CompactIndexHandler {
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, (LinkedHashMap<String, String>) partSpec, indexTableName, dbName);
- super.setStatsDir(builderConf);
return rootTask;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
index a019350..807959e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/TableBasedIndexHandler.java
@@ -116,13 +116,6 @@ public abstract class TableBasedIndexHandler extends AbstractIndexHandler {
return null;
}
- protected void setStatsDir(HiveConf builderConf) {
- String statsDir;
- if ((statsDir = builderConf.get(StatsSetupConst.STATS_TMP_LOC)) != null) {
- getConf().set(StatsSetupConst.STATS_TMP_LOC, statsDir);
- }
- }
-
protected List<String> getPartKVPairStringArray(
LinkedHashMap<String, String> partSpec) {
List<String> ret = new ArrayList<String>(partSpec.size());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
index b076933..cb191ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/bitmap/BitmapIndexHandler.java
@@ -289,7 +289,6 @@ public class BitmapIndexHandler extends TableBasedIndexHandler {
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, partSpec, indexTableName, dbName);
- super.setStatsDir(builderConf);
return rootTask;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
index 1dbe230..586e16d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
@@ -150,7 +150,6 @@ public class CompactIndexHandler extends TableBasedIndexHandler {
builderConf.setBoolVar(HiveConf.ConfVars.HIVEMERGETEZFILES, false);
Task<?> rootTask = IndexUtils.createRootTask(builderConf, inputs, outputs,
command, partSpec, indexTableName, dbName);
- super.setStatsDir(builderConf);
return rootTask;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
index 3e1ef0a..be3a671 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanMapper.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileKeyBufferWrapper;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileValueBufferWrapper;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.CombineHiveKey;
@@ -145,7 +146,9 @@ public class PartialScanMapper extends MapReduceBase implements
throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
}
- if (!statsPublisher.connect(jc)) {
+ StatsCollectionContext sc = new StatsCollectionContext(jc);
+ sc.setStatsTmpDir(jc.get(StatsSetupConst.STATS_TMP_LOC, ""));
+ if (!statsPublisher.connect(sc)) {
// should fail since stats gathering is main purpose of the job
LOG.error("StatsPublishing error: cannot connect to database");
throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
@@ -170,7 +173,7 @@ public class PartialScanMapper extends MapReduceBase implements
throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
}
- if (!statsPublisher.closeConnection()) {
+ if (!statsPublisher.closeConnection(sc)) {
// The original exception is lost.
// Not changing the interface to maintain backward compatibility
throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
index cee0878..8bebd0f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.io.NullWritable;
@@ -145,7 +148,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
LOG.info("Using " + inpFormat);
try {
- job.setInputFormat((Class<? extends InputFormat>) JavaUtils.loadClass(inpFormat));
+ job.setInputFormat(JavaUtils.loadClass(inpFormat));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
@@ -175,7 +178,7 @@ public class PartialScanTask extends Task<PartialScanWork> implements
HiveConf.setVar(job,
HiveConf.ConfVars.HIVE_STATS_KEY_PREFIX,
work.getAggKey());
-
+ job.set(StatsSetupConst.STATS_TMP_LOC, work.getStatsTmpDir());
try {
addInputPaths(job, work);
@@ -205,7 +208,9 @@ public class PartialScanTask extends Task<PartialScanWork> implements
StatsFactory factory = StatsFactory.newFactory(job);
if (factory != null) {
statsPublisher = factory.getStatsPublisher();
- if (!statsPublisher.init(job)) { // creating stats table if not exists
+ StatsCollectionContext sc = new StatsCollectionContext(job);
+ sc.setStatsTmpDir(work.getStatsTmpDir());
+ if (!statsPublisher.init(sc)) { // creating stats table if not exists
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
throw
new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
index c0a8ae7..c006743 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hive.ql.io.rcfile.stats;
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS;
+
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -42,6 +46,7 @@ public class PartialScanWork extends MapWork implements Serializable {
private transient List<Path> inputPaths;
private String aggKey;
+ private String statsTmpDir;
public PartialScanWork() {
}
@@ -101,4 +106,13 @@ public class PartialScanWork extends MapWork implements Serializable {
this.aggKey = aggKey;
}
+ public String getStatsTmpDir() {
+ return statsTmpDir;
+ }
+
+ public void setStatsTmpDir(String statsTmpDir, HiveConf conf) {
+ this.statsTmpDir = HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())
+ ? statsTmpDir : "";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
index eed1d7c..af0ac90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRTableScan1.java
@@ -64,6 +64,7 @@ public class GenMRTableScan1 implements NodeProcessor {
* @param opProcCtx
* context
*/
+ @Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx opProcCtx,
Object... nodeOutputs) throws SemanticException {
TableScanOperator op = (TableScanOperator) nd;
@@ -121,6 +122,7 @@ public class GenMRTableScan1 implements NodeProcessor {
StatsWork statsWork = new StatsWork(op.getConf().getTableMetadata().getTableSpec());
statsWork.setAggKey(op.getConf().getStatsAggPrefix());
+ statsWork.setStatsTmpDir(op.getConf().getTmpStatsDir());
statsWork.setSourceTask(currTask);
statsWork.setStatsReliable(parseCtx.getConf().getBoolVar(
HiveConf.ConfVars.HIVE_STATS_RELIABLE));
@@ -195,6 +197,7 @@ public class GenMRTableScan1 implements NodeProcessor {
PartialScanWork scanWork = new PartialScanWork(inputPaths);
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
+ scanWork.setStatsTmpDir(op.getConf().getTmpStatsDir(), parseCtx.getConf());
// stats work
statsWork.setPartialScanAnalyzeCommand(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index c696fd5..e8bd33d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -1416,7 +1416,7 @@ public final class GenMapRedUtils {
statsWork.setSourceTask(currTask);
statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
-
+ statsWork.setStatsTmpDir(nd.getConf().getStatsTmpDir());
if (currTask.getWork() instanceof MapredWork) {
MapredWork mrWork = (MapredWork) currTask.getWork();
mrWork.getMapWork().setGatheringStats(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
index f8d6905..16b4376 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ProcessAnalyzeTable.java
@@ -70,7 +70,7 @@ public class ProcessAnalyzeTable implements NodeProcessor {
throws SemanticException {
GenTezProcContext context = (GenTezProcContext) procContext;
-
+
TableScanOperator tableScan = (TableScanOperator) nd;
ParseContext parseContext = context.parseContext;
@@ -124,6 +124,7 @@ public class ProcessAnalyzeTable implements NodeProcessor {
StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
+ statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir());
statsWork.setSourceTask(context.currentTask);
statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseContext.getConf());
@@ -181,6 +182,7 @@ public class ProcessAnalyzeTable implements NodeProcessor {
PartialScanWork scanWork = new PartialScanWork(inputPaths);
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
+ scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());
// stats work
statsWork.setPartialScanAnalyzeCommand(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a114281..4af07ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6638,8 +6638,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString();
- LOG.info("Set stats collection dir : " + statsTmpLoc);
- conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
+ fileSinkDesc.setStatsTmpDir(statsTmpLoc);
+ LOG.debug("Set stats collection dir : " + statsTmpLoc);
}
if (dest_part != null) {
@@ -9541,8 +9541,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
if (HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
String statsTmpLoc = ctx.getExtTmpPathRelTo(tab.getPath()).toString();
- LOG.info("Set stats collection dir : " + statsTmpLoc);
- conf.set(StatsSetupConst.STATS_TMP_LOC, statsTmpLoc);
+ LOG.debug("Set stats collection dir : " + statsTmpLoc);
+ tsDesc.setTmpStatsDir(statsTmpLoc);
}
tsDesc.setGatherStats(true);
tsDesc.setStatsReliable(conf.getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
index 66e148f..7ab4e7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
@@ -121,6 +121,7 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
StatsWork statsWork = new StatsWork(tableScan.getConf().getTableMetadata().getTableSpec());
statsWork.setAggKey(tableScan.getConf().getStatsAggPrefix());
+ statsWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir());
statsWork.setSourceTask(context.currentTask);
statsWork.setStatsReliable(parseContext.getConf().getBoolVar(HiveConf.ConfVars.HIVE_STATS_RELIABLE));
Task<StatsWork> statsTask = TaskFactory.get(statsWork, parseContext.getConf());
@@ -176,6 +177,7 @@ public class SparkProcessAnalyzeTable implements NodeProcessor {
PartialScanWork scanWork = new PartialScanWork(inputPaths);
scanWork.setMapperCannotSpanPartns(true);
scanWork.setAggKey(aggregationKey);
+ scanWork.setStatsTmpDir(tableScan.getConf().getTmpStatsDir(), parseContext.getConf());
// stats work
statsWork.setPartialScanAnalyzeCommand(true);
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index f73b502..9d6318a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -86,6 +86,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
private boolean statsReliable;
private ListBucketingCtx lbCtx;
private int maxStatsKeyPrefixLength = -1;
+ private String statsTmpDir;
private boolean statsCollectRawDataSize;
@@ -156,7 +157,8 @@ public class FileSinkDesc extends AbstractOperatorDesc {
ret.setDpSortState(dpSortState);
ret.setWriteType(writeType);
ret.setTransactionId(txnId);
- return (Object) ret;
+ ret.setStatsTmpDir(statsTmpDir);
+ return ret;
}
@Explain(displayName = "directory", explainLevels = { Level.EXTENDED })
@@ -229,7 +231,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public void setMultiFileSpray(boolean multiFileSpray) {
this.multiFileSpray = multiFileSpray;
}
-
+
/**
* @return destination is temporary
*/
@@ -465,4 +467,14 @@ public class FileSinkDesc extends AbstractOperatorDesc {
public void setTable(Table table) {
this.table = table;
}
+
+
+ public String getStatsTmpDir() {
+ return statsTmpDir;
+ }
+
+ public void setStatsTmpDir(String statsCollectionTempDir) {
+ this.statsTmpDir = statsCollectionTempDir;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
index c8515db..d87022d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/StatsWork.java
@@ -57,6 +57,9 @@ public class StatsWork implements Serializable {
// so this is set by DriverContext in runtime
private transient Task sourceTask;
+ // used by FS based stats collector
+ private String statsTmpDir;
+
public StatsWork() {
}
@@ -72,10 +75,6 @@ public class StatsWork implements Serializable {
this.loadFileDesc = loadFileDesc;
}
- public StatsWork(boolean statsReliable) {
- this.statsReliable = statsReliable;
- }
-
public TableSpec getTableSpecs() {
return tableSpecs;
}
@@ -97,6 +96,14 @@ public class StatsWork implements Serializable {
return aggKey;
}
+ public String getStatsTmpDir() {
+ return statsTmpDir;
+ }
+
+ public void setStatsTmpDir(String statsTmpDir) {
+ this.statsTmpDir = statsTmpDir;
+ }
+
public boolean getNoStatsAggregator() {
return noStatsAggregator;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
index 98bce96..6661ce6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.PTFUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.TableSample;
@@ -70,6 +69,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
private boolean gatherStats;
private boolean statsReliable;
private int maxStatsKeyPrefixLength = -1;
+ private String tmpStatsDir;
private ExprNodeGenericFuncDesc filterExpr;
private transient Serializable filterObject;
@@ -203,6 +203,14 @@ public class TableScanDesc extends AbstractOperatorDesc {
return gatherStats;
}
+ public String getTmpStatsDir() {
+ return tmpStatsDir;
+ }
+
+ public void setTmpStatsDir(String tmpStatsDir) {
+ this.tmpStatsDir = tmpStatsDir;
+ }
+
public List<VirtualColumn> getVirtualCols() {
return virtualCols;
}
@@ -264,7 +272,7 @@ public class TableScanDesc extends AbstractOperatorDesc {
public void setBucketFileNameMapping(Map<String, Integer> bucketFileNameMapping) {
this.bucketFileNameMapping = bucketFileNameMapping;
}
-
+
public void setIsMetadataOnly(boolean metadata_only) {
isMetadataOnly = metadata_only;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
index 16b4460..b9863d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregator.java
@@ -25,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobClient;
@@ -40,10 +39,11 @@ public class CounterStatsAggregator implements StatsAggregator, StatsCollectionT
private JobClient jc;
@Override
- public boolean connect(Configuration hconf, Task sourceTask) {
+ public boolean connect(StatsCollectionContext scc) {
+ Task<?> sourceTask = scc.getTask();
if (sourceTask instanceof MapRedTask) {
try {
- jc = new JobClient(toJobConf(hconf));
+ jc = new JobClient(toJobConf(scc.getHiveConf()));
RunningJob job = jc.getJob(((MapRedTask)sourceTask).getJobID());
if (job != null) {
counters = job.getCounters();
@@ -71,7 +71,7 @@ public class CounterStatsAggregator implements StatsAggregator, StatsCollectionT
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext scc) {
try {
jc.close();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
index 13f6024..4c01b25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java
@@ -33,8 +33,8 @@ public class CounterStatsAggregatorSpark
@SuppressWarnings("rawtypes")
@Override
- public boolean connect(Configuration hconf, Task sourceTask) {
- SparkTask task = (SparkTask) sourceTask;
+ public boolean connect(StatsCollectionContext scc) {
+ SparkTask task = (SparkTask) scc.getTask();
sparkCounters = task.getSparkCounters();
if (sparkCounters == null) {
return false;
@@ -52,7 +52,7 @@ public class CounterStatsAggregatorSpark
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext scc) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java
index 02e8c0b..662c106 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorTez.java
@@ -18,11 +18,8 @@
package org.apache.hadoop.hive.ql.stats;
-import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.tez.common.counters.TezCounters;
@@ -46,10 +43,11 @@ public class CounterStatsAggregatorTez implements StatsAggregator, StatsCollecti
}
@Override
- public boolean connect(Configuration hconf, Task sourceTask) {
+ public boolean connect(StatsCollectionContext scc) {
+ Task sourceTask = scc.getTask();
if (!(sourceTask instanceof TezTask)) {
delegate = true;
- return mrAggregator.connect(hconf, sourceTask);
+ return mrAggregator.connect(scc);
}
counters = ((TezTask) sourceTask).getTezCounters();
return counters != null;
@@ -75,7 +73,7 @@ public class CounterStatsAggregatorTez implements StatsAggregator, StatsCollecti
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext scc) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
index bf7d027..e5f1400 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsPublisher.java
@@ -22,7 +22,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.mapred.Reporter;
@@ -33,12 +32,12 @@ public class CounterStatsPublisher implements StatsPublisher, StatsCollectionTas
private Reporter reporter;
@Override
- public boolean init(Configuration hconf) {
+ public boolean init(StatsCollectionContext context) {
return true;
}
@Override
- public boolean connect(Configuration hconf) {
+ public boolean connect(StatsCollectionContext statsContext) {
MapredContext context = MapredContext.get();
if (context == null || context.getReporter() == null) {
return false;
@@ -61,7 +60,7 @@ public class CounterStatsPublisher implements StatsPublisher, StatsCollectionTas
return true;
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext context) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java
index 0ae0489..b115daf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsAggregator.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.hive.ql.stats;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.Task;
-
/**
* An interface for any possible implementation for gathering statistics.
*/
@@ -35,7 +32,7 @@ public interface StatsAggregator {
* @param sourceTask
* @return true if connection is successful, false otherwise.
*/
- public boolean connect(Configuration hconf, Task sourceTask);
+ public boolean connect(StatsCollectionContext scc);
/**
* This method aggregates a given statistic from all tasks (partial stats).
@@ -65,7 +62,7 @@ public interface StatsAggregator {
*
* @return true if close connection is successful, false otherwise.
*/
- public boolean closeConnection();
+ public boolean closeConnection(StatsCollectionContext scc);
/**
* This method is called after all statistics have been aggregated. Since we support multiple
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java
new file mode 100644
index 0000000..ae6f2ac
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsCollectionContext.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.stats;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Task;
+
+public class StatsCollectionContext {
+
+ private final Configuration hiveConf;
+ private Task task;
+ private List<String> statsTmpDirs;
+
+ public List<String> getStatsTmpDirs() {
+ return statsTmpDirs;
+ }
+
+ public void setStatsTmpDirs(List<String> statsTmpDirs) {
+ this.statsTmpDirs = statsTmpDirs;
+ }
+
+ public void setStatsTmpDir(String statsTmpDir) {
+ this.statsTmpDirs = statsTmpDir == null ? new ArrayList<String>() :
+ Arrays.asList(new String[]{statsTmpDir});
+ }
+
+ public StatsCollectionContext(Configuration hiveConf) {
+ super();
+ this.hiveConf = hiveConf;
+ }
+
+ public Configuration getHiveConf() {
+ return hiveConf;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void setTask(Task task) {
+ this.task = task;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java
index 845ec6a..3631b83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsPublisher.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.stats;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
-
/**
* An interface for any possible implementation for publishing statics.
*/
@@ -37,14 +35,14 @@ public interface StatsPublisher {
* intermediate stats database.
* @return true if initialization is successful, false otherwise.
*/
- public boolean init(Configuration hconf);
+ public boolean init(StatsCollectionContext context);
/**
* This method connects to the intermediate statistics database.
* @param hconf HiveConf that contains the connection parameters.
* @return true if connection is successful, false otherwise.
*/
- public boolean connect(Configuration hconf);
+ public boolean connect(StatsCollectionContext context);
/**
* This method publishes a given statistic into a disk storage, possibly HBase or MySQL.
@@ -66,6 +64,6 @@ public interface StatsPublisher {
/**
* This method closes the connection to the temporary storage.
*/
- public boolean closeConnection();
+ public boolean closeConnection(StatsCollectionContext context);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
index be025fb..6dfc178 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsAggregator.java
@@ -26,15 +26,14 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
import com.esotericsoftware.kryo.io.Input;
@@ -44,17 +43,17 @@ public class FSStatsAggregator implements StatsAggregator, StatsCollectionTaskIn
private List<Map<String,Map<String,String>>> statsList;
private Map<String, Map<String,String>> statsMap;
private FileSystem fs;
- private Configuration conf;
@Override
- public boolean connect(Configuration hconf, Task sourceTask) {
- conf = hconf;
- Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC));
+ public boolean connect(StatsCollectionContext scc) {
+ List<String> statsDirs = scc.getStatsTmpDirs();
+ assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
+ Path statsDir = new Path(statsDirs.get(0));
LOG.debug("About to read stats from : " + statsDir);
statsMap = new HashMap<String, Map<String,String>>();
try {
- fs = statsDir.getFileSystem(hconf);
+ fs = statsDir.getFileSystem(scc.getHiveConf());
statsList = new ArrayList<Map<String,Map<String,String>>>();
FileStatus[] status = fs.listStatus(statsDir, new PathFilter() {
@Override
@@ -98,11 +97,15 @@ public class FSStatsAggregator implements StatsAggregator, StatsCollectionTaskIn
}
@Override
- public boolean closeConnection() {
- LOG.debug("About to delete stats tmp dir");
+ public boolean closeConnection(StatsCollectionContext scc) {
+ List<String> statsDirs = scc.getStatsTmpDirs();
+ assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
+ Path statsDir = new Path(statsDirs.get(0));
+
+ LOG.debug("About to delete stats tmp dir :" + statsDir);
try {
- fs.delete(new Path(conf.get(StatsSetupConst.STATS_TMP_LOC)),true);
+ fs.delete(statsDir,true);
return true;
} catch (IOException e) {
LOG.error(e);
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
index ce96064..aa2bf62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/fs/FSStatsPublisher.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.stats.fs;
import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import com.esotericsoftware.kryo.io.Output;
@@ -41,12 +43,14 @@ public class FSStatsPublisher implements StatsPublisher, StatsCollectionTaskInde
private Map<String, Map<String,String>> statsMap; // map from partID -> (statType->value)
@Override
- public boolean init(Configuration hconf) {
- Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC));
- LOG.debug("Initing FSStatsPublisher with : " + statsDir);
+ public boolean init(StatsCollectionContext context) {
try {
- statsDir.getFileSystem(hconf).mkdirs(statsDir);
- LOG.info("created : " + statsDir);
+ for (String tmpDir : context.getStatsTmpDirs()) {
+ Path statsDir = new Path(tmpDir);
+ LOG.debug("Initing FSStatsPublisher with : " + statsDir);
+ statsDir.getFileSystem(context.getHiveConf()).mkdirs(statsDir);
+ LOG.info("created : " + statsDir);
+ }
return true;
} catch (IOException e) {
LOG.error(e);
@@ -55,9 +59,11 @@ public class FSStatsPublisher implements StatsPublisher, StatsCollectionTaskInde
}
@Override
- public boolean connect(Configuration hconf) {
- conf = hconf;
- Path statsDir = new Path(hconf.get(StatsSetupConst.STATS_TMP_LOC));
+ public boolean connect(StatsCollectionContext context) {
+ conf = context.getHiveConf();
+ List<String> statsDirs = context.getStatsTmpDirs();
+ assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
+ Path statsDir = new Path(statsDirs.get(0));
LOG.debug("Connecting to : " + statsDir);
statsMap = new HashMap<String, Map<String,String>>();
try {
@@ -85,14 +91,16 @@ public class FSStatsPublisher implements StatsPublisher, StatsCollectionTaskInde
}
@Override
- public boolean closeConnection() {
- Path statsDir = new Path(conf.get(StatsSetupConst.STATS_TMP_LOC));
+ public boolean closeConnection(StatsCollectionContext context) {
+ List<String> statsDirs = context.getStatsTmpDirs();
+ assert statsDirs.size() == 1 : "Found multiple stats dirs: " + statsDirs;
+ Path statsDir = new Path(statsDirs.get(0));
try {
Path statsFile = new Path(statsDir,StatsSetupConst.STATS_FILE_PREFIX +conf.getInt("mapred.task.partition",0));
LOG.debug("About to create stats file for this task : " + statsFile);
Output output = new Output(statsFile.getFileSystem(conf).create(statsFile,true));
- LOG.info("Created file : " + statsFile);
- LOG.info("Writing stats in it : " + statsMap);
+ LOG.debug("Created file : " + statsFile);
+ LOG.debug("Writing stats in it : " + statsMap);
Utilities.runtimeSerializationKryo.get().writeObject(output, statsMap);
output.close();
return true;
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
index e92523e..d8c9926 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
@@ -34,16 +34,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
public class JDBCStatsAggregator implements StatsAggregator {
private Connection conn;
private String connectionString;
private Configuration hiveconf;
- private Task<?> sourceTask;
private final Map<String, PreparedStatement> columnMapping;
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private int timeout = 30;
@@ -58,8 +57,8 @@ public class JDBCStatsAggregator implements StatsAggregator {
}
@Override
- public boolean connect(Configuration hiveconf, Task sourceTask) {
- this.hiveconf = hiveconf;
+ public boolean connect(StatsCollectionContext scc) {
+ this.hiveconf = scc.getHiveConf();
timeout = (int) HiveConf.getTimeVar(
hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS);
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
@@ -67,7 +66,6 @@ public class JDBCStatsAggregator implements StatsAggregator {
maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
waitWindow = HiveConf.getTimeVar(
hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
- this.sourceTask = sourceTask;
try {
JavaUtils.loadClass(driver).newInstance();
@@ -159,14 +157,14 @@ public class JDBCStatsAggregator implements StatsAggregator {
return null;
}
// close the current connection
- closeConnection();
+ closeConnection(null);
long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
try {
Thread.sleep(waitTime);
} catch (InterruptedException iex) {
}
// getting a new connection
- if (!connect(hiveconf, sourceTask)) {
+ if (!connect(new StatsCollectionContext(hiveconf))) {
// if cannot reconnect, just fail because connect() already handles retries.
LOG.error("Error during publishing aggregation. " + e);
return null;
@@ -181,7 +179,7 @@ public class JDBCStatsAggregator implements StatsAggregator {
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext scc) {
if (conn == null) {
return true;
@@ -238,14 +236,14 @@ public class JDBCStatsAggregator implements StatsAggregator {
return false;
}
// close the current connection
- closeConnection();
+ closeConnection(null);
long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
try {
Thread.sleep(waitTime);
} catch (InterruptedException iex) {
}
// getting a new connection
- if (!connect(hiveconf, sourceTask)) {
+ if (!connect(new StatsCollectionContext(hiveconf))) {
LOG.error("Error during clean-up. " + e);
return false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
index aeb3d27..0318a8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
public class JDBCStatsPublisher implements StatsPublisher {
@@ -59,8 +60,8 @@ public class JDBCStatsPublisher implements StatsPublisher {
}
@Override
- public boolean connect(Configuration hiveconf) {
- this.hiveconf = hiveconf;
+ public boolean connect(StatsCollectionContext context) {
+ this.hiveconf = context.getHiveConf();
maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
waitWindow = HiveConf.getTimeVar(
hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
@@ -209,15 +210,16 @@ public class JDBCStatsPublisher implements StatsPublisher {
if (failures >= maxRetries) {
return false;
}
+ StatsCollectionContext sCntxt = new StatsCollectionContext(hiveconf);
// close the current connection
- closeConnection();
+ closeConnection(sCntxt);
long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, r);
try {
Thread.sleep(waitTime);
} catch (InterruptedException iex) {
}
// get a new connection
- if (!connect(hiveconf)) {
+ if (!connect(sCntxt)) {
// if cannot reconnect, just fail because connect() already handles retries.
LOG.error("Error during publishing aggregation. " + e);
return false;
@@ -226,7 +228,7 @@ public class JDBCStatsPublisher implements StatsPublisher {
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext context) {
if (conn == null) {
return true;
}
@@ -266,13 +268,13 @@ public class JDBCStatsPublisher implements StatsPublisher {
* creating tables.).
*/
@Override
- public boolean init(Configuration hconf) {
+ public boolean init(StatsCollectionContext context) {
Statement stmt = null;
ResultSet rs = null;
try {
- this.hiveconf = hconf;
- connectionString = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- String driver = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
+ this.hiveconf = context.getHiveConf();
+ connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
+ String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
JavaUtils.loadClass(driver).newInstance();
synchronized(DriverManager.class) {
DriverManager.setLoginTimeout(timeout);
@@ -339,7 +341,7 @@ public class JDBCStatsPublisher implements StatsPublisher {
// do nothing
}
}
- closeConnection();
+ closeConnection(context);
}
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 4594836..d22d022 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -198,8 +199,6 @@ public class TestFileSinkOperator {
@Before
public void setup() throws Exception {
jc = new JobConf();
- jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator",
- "stats").getPath());
jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname,
TFSOStatsPublisher.class.getName());
jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname,
@@ -857,12 +856,12 @@ public class TestFileSinkOperator {
static Map<String, String> stats;
@Override
- public boolean init(Configuration hconf) {
+ public boolean init(StatsCollectionContext context) {
return true;
}
@Override
- public boolean connect(Configuration hconf) {
+ public boolean connect(StatsCollectionContext context) {
return true;
}
@@ -873,7 +872,7 @@ public class TestFileSinkOperator {
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext context) {
return true;
}
}
@@ -881,7 +880,7 @@ public class TestFileSinkOperator {
public static class TFSOStatsAggregator implements StatsAggregator {
@Override
- public boolean connect(Configuration hconf, Task sourceTask) {
+ public boolean connect(StatsCollectionContext scc) {
return true;
}
@@ -891,7 +890,7 @@ public class TestFileSinkOperator {
}
@Override
- public boolean closeConnection() {
+ public boolean closeConnection(StatsCollectionContext scc) {
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9b4826e7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java
index 887716e..c257797 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestStatsPublisherEnhanced.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.mapred.JobConf;
@@ -60,9 +61,10 @@ public class TestStatsPublisherEnhanced extends TestCase {
protected void tearDown() {
StatsAggregator sa = factory.getStatsAggregator();
assertNotNull(sa);
- assertTrue(sa.connect(conf, null));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(sa.connect(sc));
assertTrue(sa.cleanUp("file_0"));
- assertTrue(sa.closeConnection());
+ assertTrue(sa.closeConnection(sc));
}
private void fillStatMap(String numRows, String rawDataSize) {
@@ -80,13 +82,14 @@ public class TestStatsPublisherEnhanced extends TestCase {
// instantiate stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
assertNotNull(statsPublisher);
- assertTrue(statsPublisher.init(conf));
- assertTrue(statsPublisher.connect(conf));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(statsPublisher.init(sc));
+ assertTrue(statsPublisher.connect(sc));
// instantiate stats aggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
assertNotNull(statsAggregator);
- assertTrue(statsAggregator.connect(conf, null));
+ assertTrue(statsAggregator.connect(sc));
// publish stats
fillStatMap("200", "1000");
@@ -109,8 +112,8 @@ public class TestStatsPublisherEnhanced extends TestCase {
assertEquals("3000", usize1);
// close connections
- assertTrue(statsPublisher.closeConnection());
- assertTrue(statsAggregator.closeConnection());
+ assertTrue(statsPublisher.closeConnection(sc));
+ assertTrue(statsAggregator.closeConnection(sc));
System.out
.println("StatsPublisher - one stat published per key - aggregating matching key - OK");
@@ -128,13 +131,14 @@ public class TestStatsPublisherEnhanced extends TestCase {
StatsPublisher statsPublisher = Utilities.getStatsPublisher(
(JobConf) conf);
assertNotNull(statsPublisher);
- assertTrue(statsPublisher.init(conf));
- assertTrue(statsPublisher.connect(conf));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(statsPublisher.init(sc));
+ assertTrue(statsPublisher.connect(sc));
// instantiate stats aggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
assertNotNull(statsAggregator);
- assertTrue(statsAggregator.connect(conf, null));
+ assertTrue(statsAggregator.connect(sc));
// statsAggregator.cleanUp("file_0000");
// assertTrue(statsAggregator.connect(conf));
@@ -172,8 +176,8 @@ public class TestStatsPublisherEnhanced extends TestCase {
assertTrue(statsAggregator.cleanUp("file_0000"));
// close connections
- assertTrue(statsPublisher.closeConnection());
- assertTrue(statsAggregator.closeConnection());
+ assertTrue(statsPublisher.closeConnection(sc));
+ assertTrue(statsAggregator.closeConnection(sc));
System.out.println("StatsPublisher - basic functionality - OK");
} catch (Throwable e) {
@@ -189,13 +193,14 @@ public class TestStatsPublisherEnhanced extends TestCase {
// instantiate stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
assertNotNull(statsPublisher);
- assertTrue(statsPublisher.init(conf));
- assertTrue(statsPublisher.connect(conf));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(statsPublisher.init(sc));
+ assertTrue(statsPublisher.connect(sc));
// instantiate stats aggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
assertNotNull(statsAggregator);
- assertTrue(statsAggregator.connect(conf, null));
+ assertTrue(statsAggregator.connect(sc));
// publish stats
fillStatMap("200", "1000");
@@ -236,8 +241,8 @@ public class TestStatsPublisherEnhanced extends TestCase {
assertTrue(statsAggregator.cleanUp("file_0000"));
// close connections
- assertTrue(statsPublisher.closeConnection());
- assertTrue(statsAggregator.closeConnection());
+ assertTrue(statsPublisher.closeConnection(sc));
+ assertTrue(statsAggregator.closeConnection(sc));
System.out.println("StatsPublisher - multiple updates - OK");
} catch (Throwable e) {
@@ -254,13 +259,14 @@ public class TestStatsPublisherEnhanced extends TestCase {
// instantiate stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
assertNotNull(statsPublisher);
- assertTrue(statsPublisher.init(conf));
- assertTrue(statsPublisher.connect(conf));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(statsPublisher.init(sc));
+ assertTrue(statsPublisher.connect(sc));
// instantiate stats aggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
assertNotNull(statsAggregator);
- assertTrue(statsAggregator.connect(conf, null));
+ assertTrue(statsAggregator.connect(sc));
// publish stats
fillStatMap("200", "");
@@ -305,8 +311,8 @@ public class TestStatsPublisherEnhanced extends TestCase {
assertTrue(statsAggregator.cleanUp("file_0000"));
// close connections
- assertTrue(statsPublisher.closeConnection());
- assertTrue(statsAggregator.closeConnection());
+ assertTrue(statsPublisher.closeConnection(sc));
+ assertTrue(statsAggregator.closeConnection(sc));
System.out
.println("StatsPublisher - (multiple updates + publishing subset of supported statistics) - OK");
@@ -325,13 +331,14 @@ public class TestStatsPublisherEnhanced extends TestCase {
// instantiate stats publisher
StatsPublisher statsPublisher = Utilities.getStatsPublisher((JobConf) conf);
assertNotNull(statsPublisher);
- assertTrue(statsPublisher.init(conf));
- assertTrue(statsPublisher.connect(conf));
+ StatsCollectionContext sc = new StatsCollectionContext(conf);
+ assertTrue(statsPublisher.init(sc));
+ assertTrue(statsPublisher.connect(sc));
// instantiate stats aggregator
StatsAggregator statsAggregator = factory.getStatsAggregator();
assertNotNull(statsAggregator);
- assertTrue(statsAggregator.connect(conf, null));
+ assertTrue(statsAggregator.connect(sc));
// publish stats
fillStatMap("200", "1000");
@@ -364,8 +371,8 @@ public class TestStatsPublisherEnhanced extends TestCase {
assertTrue(statsAggregator.cleanUp("file_0000"));
// close connections
- assertTrue(statsPublisher.closeConnection());
- assertTrue(statsAggregator.closeConnection());
+ assertTrue(statsPublisher.closeConnection(sc));
+ assertTrue(statsAggregator.closeConnection(sc));
System.out.println("StatsAggregator - clean-up - OK");
} catch (Throwable e) {