You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/11/06 14:53:27 UTC
git commit: updated refs/heads/trunk to e5a21c4
Updated Branches:
refs/heads/trunk 3a20c5597 -> e5a21c4bb
GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e5a21c4b
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e5a21c4b
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e5a21c4b
Branch: refs/heads/trunk
Commit: e5a21c4bb27055cad85f1d9dd494867c0fae8c22
Parents: 3a20c55
Author: Claudio Martella <cl...@gmail.com>
Authored: Wed Nov 6 14:52:45 2013 +0100
Committer: Claudio Martella <cl...@gmail.com>
Committed: Wed Nov 6 14:52:45 2013 +0100
----------------------------------------------------------------------
CHANGELOG | 2 ++
.../java/org/apache/giraph/bsp/BspService.java | 5 ++--
.../apache/giraph/conf/GiraphConfiguration.java | 21 ++++++++++++++
.../org/apache/giraph/conf/GiraphConstants.java | 16 ++++++++++-
.../apache/giraph/graph/GraphTaskManager.java | 29 ++++++++------------
.../apache/giraph/master/BspServiceMaster.java | 6 ++--
.../apache/giraph/worker/BspServiceWorker.java | 4 +--
.../apache/giraph/yarn/GiraphYarnClient.java | 6 ++--
8 files changed, 58 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 04656f0..cd04a2b 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-755: Make ZooKeeper port list available to input/output format (armax00 via claudio)
+
GIRAPH-737: Giraph Application Master: Move to new and stable YARN API (mislam via ereisman)
GIRAPH-791: HiveGiraphRunner picks -D options too late (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 34f4b51..86823ed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -253,13 +253,11 @@ public abstract class BspService<I extends WritableComparable,
/**
* Constructor.
*
- * @param serverPortList ZooKeeper server port list
* @param sessionMsecTimeout ZooKeeper session timeount in milliseconds
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
*/
- public BspService(String serverPortList,
- int sessionMsecTimeout,
+ public BspService(int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
this.vertexInputSplitsEvents = new InputSplitEvents(context);
@@ -322,6 +320,7 @@ public abstract class BspService<I extends WritableComparable,
CHECKPOINT_DIRECTORY.getWithDefault(getConfiguration(),
CHECKPOINT_DIRECTORY.getDefaultValue() + "/" + getJobId());
masterElectionPath = basePath + MASTER_ELECTION_DIR;
+ String serverPortList = conf.getZookeeperList();
haltComputationPath = basePath + HALT_COMPUTATION_NODE;
getContext().getCounter(GiraphConstants.ZOOKEEPER_HALT_NODE_COUNTER_GROUP,
haltComputationPath);
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index f176bfe..d066513 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -731,6 +731,27 @@ public class GiraphConfiguration extends Configuration
return get(ZOOKEEPER_LIST);
}
+ /**
+ * Set the ZooKeeper list to the provided list. This method is used when the
+ * ZooKeeper is started internally and will set the zkIsExternal option to
+ * false as well.
+ *
+ * @param zkList list of strings, comma separated of zookeeper servers
+ */
+ public void setZookeeperList(String zkList) {
+ set(ZOOKEEPER_LIST, zkList);
+ ZOOKEEPER_IS_EXTERNAL.set(this, false);
+ }
+
+ /**
+ * Was ZooKeeper provided externally?
+ *
+ * @return true iff was zookeeper is external
+ */
+ public boolean isZookeeperExternal() {
+ return ZOOKEEPER_IS_EXTERNAL.get(this);
+ }
+
public String getLocalLevel() {
return LOG_LEVEL.get(this);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 6f32e46..3f379f1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -404,10 +404,24 @@ public interface GiraphConstants {
/**
* ZooKeeper comma-separated list (if not set,
- * will start up ZooKeeper locally)
+ * will start up ZooKeeper locally). Consider that after locally-starting
+ * zookeeper, this parameter will updated the configuration with the corrent
+ * configuration value.
*/
String ZOOKEEPER_LIST = "giraph.zkList";
+ /**
+ * Zookeeper List will always hold a value during the computation while
+ * this option provides information regarding whether the zookeeper was
+ * internally started or externally provided.
+ */
+ BooleanConfOption ZOOKEEPER_IS_EXTERNAL =
+ new BooleanConfOption("giraph.zkIsExternal", true,
+ "Zookeeper List will always hold a value during " +
+ "the computation while this option provides " +
+ "information regarding whether the zookeeper was " +
+ "internally started or externally provided.");
+
/** ZooKeeper session millisecond timeout */
IntConfOption ZOOKEEPER_SESSION_TIMEOUT =
new IntConfOption("giraph.zkSessionMsecTimeout", MINUTES.toMillis(1),
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 3939d49..f31d99e 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -144,8 +144,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
private GiraphTimerContext communicationTimerContext;
/** Timer for WorkerContext#preSuperstep() */
private GiraphTimer wcPreSuperstepTimer;
- /** Zookeeper host:port list */
- private String serverPortList;
/** The Hadoop Mapper#Context for this job */
private Mapper<?, ?, ?, ?>.Context context;
/** is this GraphTaskManager the master? */
@@ -200,7 +198,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
// Do some task setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
locateZookeeperClasspath(zkPathList);
- serverPortList = conf.getZookeeperList();
+ String serverPortList = conf.getZookeeperList();
if (serverPortList == null && startZooKeeperManager()) {
return; // ZK connect/startup failed
}
@@ -219,7 +217,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
}
int sessionMsecTimeout = conf.getZooKeeperSessionTimeout();
try {
- instantiateBspService(serverPortList, sessionMsecTimeout);
+ instantiateBspService(sessionMsecTimeout);
} catch (IOException e) {
LOG.error("setup: Caught exception just before end of setup", e);
if (zkManager != null) {
@@ -369,7 +367,8 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
return true;
}
zkManager.onlineZooKeeperServers();
- serverPortList = zkManager.getZooKeeperServerPortString();
+ String serverPortList = zkManager.getZooKeeperServerPortString();
+ conf.setZookeeperList(serverPortList);
context.getCounter(GiraphConstants.ZOOKEEPER_SERVER_PORT_COUNTER_GROUP,
serverPortList);
return false;
@@ -493,9 +492,9 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
* ZooKeeper.
* 2) If split master/worker, masters also run ZooKeeper
*
- * 3) If split master/worker == true and <code>giraph.zkList</code> is set,
- * the master will not instantiate a ZK instance, but will assume
- * a quorum is already active on the cluster for Giraph to use.
+ * 3) If split master/worker == true and <code>giraph.zkList</code> is
+ * externally provided, the master will not instantiate a ZK instance, but
+ * will assume a quorum is already active on the cluster for Giraph to use.
*
* @param conf Configuration to use
* @param zkManager ZooKeeper manager to help determine whether to run
@@ -507,7 +506,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
ZooKeeperManager zkManager) {
boolean splitMasterWorker = conf.getSplitMasterWorker();
int taskPartition = conf.getTaskPartition();
- boolean zkAlreadyProvided = conf.getZookeeperList() != null;
+ boolean zkAlreadyProvided = conf.isZookeeperExternal();
GraphFunctions functions = GraphFunctions.UNKNOWN;
// What functions should this mapper do?
if (!splitMasterWorker) {
@@ -538,18 +537,17 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
/**
* Instantiate the appropriate BspService object (Master or Worker)
* for this compute node.
- * @param serverPortList host:port list for connecting to ZK quorum
* @param sessionMsecTimeout configurable session timeout
*/
- private void instantiateBspService(String serverPortList,
- int sessionMsecTimeout) throws IOException, InterruptedException {
+ private void instantiateBspService(int sessionMsecTimeout)
+ throws IOException, InterruptedException {
if (graphFunctions.isMaster()) {
if (LOG.isInfoEnabled()) {
LOG.info("setup: Starting up BspServiceMaster " +
"(master thread)...");
}
serviceMaster = new BspServiceMaster<I, V, E>(
- serverPortList, sessionMsecTimeout, context, this);
+ sessionMsecTimeout, context, this);
masterThread = new MasterThread<I, V, E>(serviceMaster, context);
masterThread.start();
}
@@ -558,10 +556,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
LOG.info("setup: Starting up BspServiceWorker...");
}
serviceWorker = new BspServiceWorker<I, V, E>(
- serverPortList,
- sessionMsecTimeout,
- context,
- this);
+ sessionMsecTimeout, context, this);
if (LOG.isInfoEnabled()) {
LOG.info("setup: Registering health of this worker...");
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index f043c61..baa8434 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -187,17 +187,15 @@ public class BspServiceMaster<I extends WritableComparable,
/**
* Constructor for setting up the master.
*
- * @param serverPortList ZooKeeper server port list
* @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
*/
public BspServiceMaster(
- String serverPortList,
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager) {
- super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+ super(sessionMsecTimeout, context, graphTaskManager);
workerWroteCheckpoint = new PredicateLock(context);
registerBspEvent(workerWroteCheckpoint);
superstepStateChanged = new PredicateLock(context);
@@ -1768,7 +1766,7 @@ public class BspServiceMaster<I extends WritableComparable,
// and the master can do any final cleanup if the ZooKeeper service was
// provided (not dynamically started) and we don't want to keep the data
try {
- if (getConfiguration().getZookeeperList() != null &&
+ if (getConfiguration().isZookeeperExternal() &&
KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanupZooKeeper: Removing the following path " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index a92ddf8..f6da680 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -169,7 +169,6 @@ public class BspServiceWorker<I extends WritableComparable,
/**
* Constructor for setting up the worker.
*
- * @param serverPortList ZooKeeper server port list
* @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
* @param context Mapper context
* @param graphTaskManager GraphTaskManager for this compute node
@@ -177,12 +176,11 @@ public class BspServiceWorker<I extends WritableComparable,
* @throws InterruptedException
*/
public BspServiceWorker(
- String serverPortList,
int sessionMsecTimeout,
Mapper<?, ?, ?, ?>.Context context,
GraphTaskManager<I, V, E> graphTaskManager)
throws IOException, InterruptedException {
- super(serverPortList, sessionMsecTimeout, context, graphTaskManager);
+ super(sessionMsecTimeout, context, graphTaskManager);
ImmutableClassesGiraphConfiguration<I, V, E> conf = getConfiguration();
partitionExchangeChildrenChanged = new PredicateLock(context);
registerBspEvent(partitionExchangeChildrenChanged);
http://git-wip-us.apache.org/repos/asf/giraph/blob/e5a21c4b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
index ab6564e..e8926eb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnClient.java
@@ -455,11 +455,11 @@ public class GiraphYarnClient {
* removed as we expand the functionality of the "pure YARN" Giraph profile.
*/
private void checkJobLocalZooKeeperSupported() {
+ final boolean isZkExternal = giraphConf.isZookeeperExternal();
final String checkZkList = giraphConf.getZookeeperList();
- if (checkZkList == null || checkZkList.isEmpty()) {
+ if (!isZkExternal || checkZkList.isEmpty()) {
throw new IllegalArgumentException("Giraph on YARN does not currently" +
- "support Giraph-managed ZK instances: use a standalone ZooKeeper: '" +
- checkZkList + "'");
+ "support Giraph-managed ZK instances: use a standalone ZooKeeper.");
}
}