You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ac...@apache.org on 2012/10/05 23:37:17 UTC
svn commit: r1394826 - in /giraph/trunk: ./ src/main/java/org/apache/giraph/
src/main/java/org/apache/giraph/graph/ src/main/java/org/apache/giraph/zk/
Author: aching
Date: Fri Oct 5 21:37:16 2012
New Revision: 1394826
URL: http://svn.apache.org/viewvc?rev=1394826&view=rev
Log:
GIRAPH-356: Improve ZooKeeper issues. (aching)
Modified:
giraph/trunk/CHANGELOG
giraph/trunk/pom.xml
giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
Modified: giraph/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/giraph/trunk/CHANGELOG?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/CHANGELOG (original)
+++ giraph/trunk/CHANGELOG Fri Oct 5 21:37:16 2012
@@ -1,6 +1,9 @@
Giraph Change Log
Release 0.2.0 - unreleased
+
+ GIRAPH-356: Improve ZooKeeper issues. (aching)
+
GIRAPH-342: Recursive ZooKeeper calls should call progress, dynamic
ZooKeeper can skip delete (aching via majakabiljo)
Modified: giraph/trunk/pom.xml
URL: http://svn.apache.org/viewvc/giraph/trunk/pom.xml?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/pom.xml (original)
+++ giraph/trunk/pom.xml Fri Oct 5 21:37:16 2012
@@ -954,5 +954,5 @@ under the License.
<artifactId>netty</artifactId>
<version>3.5.3.Final</version>
</dependency>
- </dependencies>
+ </dependencies>
</project>
Modified: giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/GiraphConfiguration.java Fri Oct 5 21:37:16 2012
@@ -466,6 +466,12 @@ public class GiraphConfiguration extends
public static final String ZOOKEEPER_MANAGER_DIR_DEFAULT =
"_bsp/_defaultZkManagerDir";
+ /** Number of ZooKeeper client connection attempts before giving up. */
+ public static final String ZOOKEEPER_CONNECTION_ATTEMPTS =
+ "giraph.zkConnectionAttempts";
+ /** Default of 10 ZooKeeper client connection attempts before giving up. */
+ public static final int ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT = 10;
+
/** This directory has/stores the available checkpoint files in HDFS. */
public static final String CHECKPOINT_DIRECTORY =
"giraph.checkpointDirectory";
@@ -539,11 +545,22 @@ public class GiraphConfiguration extends
public static final int DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS = 10000;
/** ZooKeeper minimum session timeout */
public static final String ZOOKEEPER_MIN_SESSION_TIMEOUT =
- "giraph.zookeeperMinSessionTimeout";
- /** Default ZooKeeper minimum session timeout of 5 minutes (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 300 * 1000;
- /** Default ZooKeeper maximum session timeout of 10 minutes (in msecs). */
- public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 600 * 1000;
+ "giraph.zKMinSessionTimeout";
+ /** Default ZooKeeper minimum session timeout of 10 minutes (in msecs). */
+ public static final int DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT = 600 * 1000;
+ /** ZooKeeper maximum session timeout */
+ public static final String ZOOKEEPER_MAX_SESSION_TIMEOUT =
+ "giraph.zkMaxSessionTimeout";
+ /** Default ZooKeeper maximum session timeout of 15 minutes (in msecs). */
+ public static final int DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT = 900 * 1000;
+ /** ZooKeeper force sync */
+ public static final String ZOOKEEPER_FORCE_SYNC = "giraph.zKForceSync";
+ /** Default ZooKeeper force sync is off (for performance) */
+ public static final String DEFAULT_ZOOKEEPER_FORCE_SYNC = "no";
+ /** ZooKeeper skip ACLs */
+ public static final String ZOOKEEPER_SKIP_ACL = "giraph.ZkSkipAcl";
+ /** Default ZooKeeper skip ACLs true (for performance) */
+ public static final String DEFAULT_ZOOKEEPER_SKIP_ACL = "yes";
/**
* Constructor that creates the configuration
@@ -787,4 +804,41 @@ public class GiraphConfiguration extends
return getNettyServerThreads();
}
}
+
+ public int getZookeeperConnectionAttempts() {
+ return getInt(ZOOKEEPER_CONNECTION_ATTEMPTS,
+ ZOOKEEPER_CONNECTION_ATTEMPTS_DEFAULT);
+ }
+
+ public int getZooKeeperMinSessionTimeout() {
+ return getInt(ZOOKEEPER_MIN_SESSION_TIMEOUT,
+ DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
+ }
+
+ public int getZooKeeperMaxSessionTimeout() {
+ return getInt(ZOOKEEPER_MAX_SESSION_TIMEOUT,
+ DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT);
+ }
+
+ public String getZooKeeperForceSync() {
+ return get(ZOOKEEPER_FORCE_SYNC, DEFAULT_ZOOKEEPER_FORCE_SYNC);
+ }
+
+ public String getZooKeeperSkipAcl() {
+ return get(ZOOKEEPER_SKIP_ACL, DEFAULT_ZOOKEEPER_SKIP_ACL);
+ }
+
+ /**
+ * Get the number of map tasks in this job
+ *
+ * @return Number of map tasks in this job
+ */
+ public int getMapTasks() {
+ int mapTasks = getInt("mapred.map.tasks", -1);
+ if (mapTasks == -1) {
+ throw new IllegalStateException("getMapTasks: Failed to get the map " +
+ "tasks!");
+ }
+ return mapTasks;
+ }
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Fri Oct 5 21:37:16 2012
@@ -934,13 +934,12 @@ public abstract class BspService<I exten
if ((event.getPath() == null) && (event.getType() == EventType.None)) {
if (event.getState() == KeeperState.Disconnected) {
- // No way to recover from a disconnect event, signal all BspEvents
+ // Watches may not be triggered for some time, so signal all BspEvents
for (BspEvent bspEvent : registeredBspEvents) {
bspEvent.signal();
}
- throw new RuntimeException(
- "process: Disconnected from ZooKeeper, cannot recover - " +
- event);
+ LOG.warn("process: Disconnected from ZooKeeper (will automatically " +
+ "try to recover) " + event);
} else if (event.getState() == KeeperState.SyncConnected) {
if (LOG.isInfoEnabled()) {
LOG.info("process: Asynchronous connection complete.");
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Fri Oct 5 21:37:16 2012
@@ -443,23 +443,8 @@ public class BspServiceMaster<I extends
// Find the missing workers if there are only a few
if ((maxWorkers - totalResponses) <=
partitionLongTailMinPrint) {
- Set<Integer> partitionSet = new TreeSet<Integer>();
- for (WorkerInfo workerInfo : healthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
- }
- for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
- partitionSet.add(workerInfo.getTaskId());
- }
- for (int i = 1; i <= maxWorkers; ++i) {
- if (partitionSet.contains(Integer.valueOf(i))) {
- continue;
- } else if (i == getTaskPartition()) {
- continue;
- } else {
- LOG.info("checkWorkers: No response from " +
- "partition " + i + " (could be master)");
- }
- }
+ logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+ unhealthyWorkerInfoList);
}
}
++pollAttempt;
@@ -477,6 +462,8 @@ public class BspServiceMaster<I extends
if (healthyWorkerInfoList.size() < minWorkers) {
LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
" available when " + minWorkers + " are required.");
+ logMissingWorkersOnSuperstep(healthyWorkerInfoList,
+ unhealthyWorkerInfoList);
return null;
}
@@ -488,6 +475,36 @@ public class BspServiceMaster<I extends
return healthyWorkerInfoList;
}
+ /**
+ * Log info level of the missing workers on the superstep
+ *
+ * @param healthyWorkerInfoList Healthy worker list
+ * @param unhealthyWorkerInfoList Unhealthy worker list
+ */
+ private void logMissingWorkersOnSuperstep(
+ List<WorkerInfo> healthyWorkerInfoList,
+ List<WorkerInfo> unhealthyWorkerInfoList) {
+ if (LOG.isInfoEnabled()) {
+ Set<Integer> partitionSet = new TreeSet<Integer>();
+ for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+ partitionSet.add(workerInfo.getTaskId());
+ }
+ for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+ partitionSet.add(workerInfo.getTaskId());
+ }
+ for (int i = 1; i <= maxWorkers; ++i) {
+ if (partitionSet.contains(Integer.valueOf(i))) {
+ continue;
+ } else if (i == getTaskPartition()) {
+ continue;
+ } else {
+ LOG.info("logMissingWorkersOnSuperstep: No response from " +
+ "partition " + i + " (could be master)");
+ }
+ }
+ }
+ }
+
@Override
public int createInputSplits() {
// Only the 'master' should be doing this. Wait until the number of
@@ -1048,22 +1065,29 @@ public class BspServiceMaster<I extends
// Find the last good checkpoint if none have been written to the
// knowledge of this master
if (lastCheckpointedSuperstep == -1) {
- FileStatus[] fileStatusArray =
- getFs().listStatus(new Path(checkpointBasePath),
- new FinalizedCheckpointPathFilter());
- if (fileStatusArray == null) {
- return -1;
- }
- Arrays.sort(fileStatusArray);
- lastCheckpointedSuperstep = getCheckpoint(
- fileStatusArray[fileStatusArray.length - 1].getPath());
- if (LOG.isInfoEnabled()) {
- LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
- lastCheckpointedSuperstep + " from " +
- fileStatusArray[fileStatusArray.length - 1].
- getPath().toString());
+ try {
+ FileStatus[] fileStatusArray =
+ getFs().listStatus(new Path(checkpointBasePath),
+ new FinalizedCheckpointPathFilter());
+ if (fileStatusArray == null) {
+ return -1;
+ }
+ Arrays.sort(fileStatusArray);
+ lastCheckpointedSuperstep = getCheckpoint(
+ fileStatusArray[fileStatusArray.length - 1].getPath());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+ lastCheckpointedSuperstep + " from " +
+ fileStatusArray[fileStatusArray.length - 1].
+ getPath().toString());
+ }
+ } catch (IOException e) {
+ LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be " +
+ "found, killing the job.", e);
+ failJob();
}
}
+
return lastCheckpointedSuperstep;
}
@@ -1458,13 +1482,14 @@ public class BspServiceMaster<I extends
// and the master can do any final cleanup if the ZooKeeper service was
// provided (not dynamically started) and we don't want to keep the data
try {
- if (getConfiguration().get(GiraphConfiguration.ZOOKEEPER_LIST) != null &&
+ if (getConfiguration().getZookeeperList() != null &&
!getConfiguration().getBoolean(
GiraphConfiguration.KEEP_ZOOKEEPER_DATA,
GiraphConfiguration.KEEP_ZOOKEEPER_DATA_DEFAULT)) {
if (LOG.isInfoEnabled()) {
LOG.info("cleanupZooKeeper: Removing the following path " +
- "and all children - " + basePath);
+ "and all children - " + basePath + " from ZooKeeper list " +
+ getConfiguration().getZookeeperList());
}
getZkExt().deleteExt(basePath, -1, true);
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/graph/GraphMapper.java Fri Oct 5 21:37:16 2012
@@ -314,12 +314,11 @@ public class GraphMapper<I extends Writa
LOG.info("setup: classpath @ " + zkClasspath + " for job " +
context.getJobName());
}
- context.getConfiguration().set(
- GiraphConfiguration.ZOOKEEPER_JAR, zkClasspath);
+ conf.setZooKeeperJar(zkClasspath);
}
String serverPortList = conf.getZookeeperList();
if (serverPortList == null) {
- zkManager = new ZooKeeperManager(context);
+ zkManager = new ZooKeeperManager(context, conf);
context.setStatus("setup: Setting up Zookeeper manager.");
zkManager.setup();
if (zkManager.computationDone()) {
@@ -578,7 +577,18 @@ public class GraphMapper<I extends Writa
context);
}
cleanup(context);
- } catch (IOException e) {
+ // Checkstyle exception due to needing to dump ZooKeeper failure
+ // on exception
+ // CHECKSTYLE: stop IllegalCatch
+ } catch (RuntimeException e) {
+ // CHECKSTYLE: resume IllegalCatch
+ if (mapFunctions == MapFunctions.UNKNOWN ||
+ mapFunctions == MapFunctions.MASTER_ZOOKEEPER_ONLY) {
+ // ZooKeeper may have had an issue
+ if (zkManager != null) {
+ zkManager.logZooKeeperOutput(Level.WARN);
+ }
+ }
if (mapFunctions == MapFunctions.WORKER_ONLY) {
serviceWorker.failureCleanup();
}
Modified: giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java
URL: http://svn.apache.org/viewvc/giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java?rev=1394826&r1=1394825&r2=1394826&view=diff
==============================================================================
--- giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java (original)
+++ giraph/trunk/src/main/java/org/apache/giraph/zk/ZooKeeperManager.java Fri Oct 5 21:37:16 2012
@@ -21,13 +21,16 @@ package org.apache.giraph.zk;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
+import java.util.LinkedList;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.GiraphConfiguration;
+import org.apache.giraph.ImmutableClassesGiraphConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
@@ -68,7 +71,7 @@ public class ZooKeeperManager {
/** Job context (mainly for progress) */
private Mapper<?, ?, ?, ?>.Context context;
/** Hadoop configuration */
- private final Configuration conf;
+ private final ImmutableClassesGiraphConfiguration conf;
/** Task partition, to ensure uniqueness */
private final int taskPartition;
/** HDFS base directory for all file-based coordination */
@@ -96,13 +99,13 @@ public class ZooKeeperManager {
/** Thread that gets the zkProcess output */
private StreamCollector zkProcessCollector = null;
/** ZooKeeper local file system directory */
- private String zkDir = null;
+ private final String zkDir;
/** ZooKeeper config file path */
- private String configFilePath = null;
+ private final String configFilePath;
/** ZooKeeper server list */
private final Map<String, Integer> zkServerPortMap = Maps.newTreeMap();
/** ZooKeeper base port */
- private int zkBasePort = -1;
+ private final int zkBasePort;
/** Final ZooKeeper server port list (for clients) */
private String zkServerPortString;
/** My hostname */
@@ -126,13 +129,15 @@ public class ZooKeeperManager {
/**
* Constructor with context.
*
- * @param context Context to be stord internally
+ * @param context Context to be stored internally
+ * @param configuration Configuration
* @throws IOException
*/
- public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context)
+ public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context,
+ ImmutableClassesGiraphConfiguration configuration)
throws IOException {
this.context = context;
- conf = context.getConfiguration();
+ this.conf = configuration;
taskPartition = conf.getInt("mapred.task.partition", -1);
jobId = conf.get("mapred.job.id", "Unknown Job");
baseDirectory =
@@ -201,11 +206,14 @@ public class ZooKeeperManager {
* Collects the output of a stream and dumps it to the log.
*/
private static class StreamCollector extends Thread {
+ /** Number of last lines to keep */
+ private static final int LAST_LINES_COUNT = 100;
/** Class logger */
private static final Logger LOG = Logger.getLogger(StreamCollector.class);
- /** Input stream to dump */
- private final InputStream is;
-
+ /** Buffered reader of input stream */
+ private final BufferedReader bufferedReader;
+ /** Last lines (help to debug failures) */
+ private final LinkedList<String> lastLines = Lists.newLinkedList();
/**
* Constructor.
*
@@ -213,22 +221,49 @@ public class ZooKeeperManager {
*/
public StreamCollector(final InputStream is) {
super(StreamCollector.class.getName());
- this.is = is;
+ setDaemon(true);
+ InputStreamReader streamReader = new InputStreamReader(is);
+ bufferedReader = new BufferedReader(streamReader);
}
@Override
public void run() {
- InputStreamReader streamReader = new InputStreamReader(is);
- BufferedReader bufferedReader = new BufferedReader(streamReader);
+ readLines();
+ }
+
+ /**
+ * Read all the lines from the bufferedReader.
+ */
+ private synchronized void readLines() {
String line;
try {
while ((line = bufferedReader.readLine()) != null) {
+ if (lastLines.size() > LAST_LINES_COUNT) {
+ lastLines.removeFirst();
+ }
+ lastLines.add(line);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("run: " + line);
+ LOG.debug("readLines: " + line);
}
}
} catch (IOException e) {
- LOG.error("run: Ignoring IOException", e);
+ LOG.error("readLines: Ignoring IOException", e);
+ }
+ }
+
+ /**
+ * Dump the last n lines of the collector. Likely used in
+ * the case of failure.
+ *
+ * @param level Log level to dump with
+ */
+ public synchronized void dumpLastLines(Level level) {
+ // Get any remaining lines
+ readLines();
+ // Dump the lines to the screen
+ for (String line : lastLines) {
+ LOG.log(level, line);
}
}
}
@@ -527,19 +562,18 @@ public class ZooKeeperManager {
writer.write("maxClientCnxns=" +
GiraphConfiguration.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS +
"\n");
- int minSessionTimeout = conf.getInt(
- GiraphConfiguration.ZOOKEEPER_MIN_SESSION_TIMEOUT,
- GiraphConfiguration.DEFAULT_ZOOKEEPER_MIN_SESSION_TIMEOUT);
- writer.write("minSessionTimeout=" + minSessionTimeout + "\n");
+ writer.write("minSessionTimeout=" +
+ conf.getZooKeeperMinSessionTimeout() + "\n");
writer.write("maxSessionTimeout=" +
- GiraphConfiguration.DEFAULT_ZOOKEEPER_MAX_SESSION_TIMEOUT +
- "\n");
+ conf.getZooKeeperMaxSessionTimeout() + "\n");
writer.write("initLimit=" +
GiraphConfiguration.DEFAULT_ZOOKEEPER_INIT_LIMIT + "\n");
writer.write("syncLimit=" +
GiraphConfiguration.DEFAULT_ZOOKEEPER_SYNC_LIMIT + "\n");
writer.write("snapCount=" +
GiraphConfiguration.DEFAULT_ZOOKEEPER_SNAP_COUNT + "\n");
+ writer.write("forceSync=" + conf.getZooKeeperForceSync() + "\n");
+ writer.write("skipACL=" + conf.getZooKeeperSkipAcl() + "\n");
if (serverList.size() != 1) {
writer.write("electionAlg=0\n");
for (int i = 0; i < serverList.size(); ++i) {
@@ -624,17 +658,28 @@ public class ZooKeeperManager {
}
Runnable runnable = new Runnable() {
public void run() {
+ LOG.info("run: Shutdown hook started.");
synchronized (this) {
if (zkProcess != null) {
LOG.warn("onlineZooKeeperServers: " +
"Forced a shutdown hook kill of the " +
"ZooKeeper process.");
zkProcess.destroy();
+ int exitCode = -1;
+ try {
+ exitCode = zkProcess.waitFor();
+ } catch (InterruptedException e) {
+ LOG.warn("run: Couldn't get exit code.");
+ }
+ LOG.info("onlineZooKeeperServers: ZooKeeper process exited " +
+ "with " + exitCode + " (note that 143 " +
+ "typically means killed).");
}
}
}
};
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+ LOG.info("onlineZooKeeperServers: Shutdown hook added.");
} catch (IOException e) {
LOG.error("onlineZooKeeperServers: Failed to start " +
"ZooKeeper process", e);
@@ -644,7 +689,8 @@ public class ZooKeeperManager {
// Once the server is up and running, notify that this server is up
// and running by dropping a ready stamp.
int connectAttempts = 0;
- final int maxConnectAttempts = 10;
+ final int maxConnectAttempts =
+ conf.getZookeeperConnectionAttempts();
while (connectAttempts < maxConnectAttempts) {
try {
if (LOG.isInfoEnabled()) {
@@ -811,7 +857,7 @@ public class ZooKeeperManager {
}
synchronized (this) {
if (zkProcess != null) {
- int totalMapTasks = conf.getInt("mapred.map.tasks", -1);
+ int totalMapTasks = conf.getMapTasks();
waitUntilAllTasksDone(totalMapTasks);
zkProcess.destroy();
int exitValue = -1;
@@ -850,4 +896,18 @@ public class ZooKeeperManager {
return zkProcess != null;
}
}
+
+ /**
+ * Log the zookeeper output from the process (if it was started)
+ *
+ * @param level Log level to print at
+ */
+ public void logZooKeeperOutput(Level level) {
+ if (zkProcessCollector != null) {
+ LOG.log(level, "logZooKeeperOutput: Dumping up to last " +
+ StreamCollector.LAST_LINES_COUNT +
+ " lines of the ZooKeeper process STDOUT and STDERR.");
+ zkProcessCollector.dumpLastLines(level);
+ }
+ }
}