You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:42 UTC
[17/24] git commit: Added null check for anchors. Log a component's
error stream under it name.
Added null check for anchors. Log a component's error stream under it name.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8cde5508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8cde5508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8cde5508
Branch: refs/heads/master
Commit: 8cde5508b5e4828982aa4f1ff2e70ba6fd10f4b9
Parents: 5ba0938
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Fri Nov 29 14:47:31 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Fri Nov 29 14:47:31 2013 +0200
----------------------------------------------------------------------
.../jvm/backtype/storm/spout/ShellSpout.java | 8 +++---
.../src/jvm/backtype/storm/task/ShellBolt.java | 19 +++++++------
.../jvm/backtype/storm/utils/ShellProcess.java | 28 +++++++++-----------
3 files changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 6840938..bd5f84e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ public class ShellSpout implements ISpout {
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
+
_process = new ShellProcess(_command);
Number subpid = _process.launch(stormConf, context);
@@ -51,7 +52,7 @@ public class ShellSpout implements ISpout {
}
public void ack(Object msgId) {
- if (spoutMsg == null) {
+ if (spoutMsg == null) {
spoutMsg = new SpoutMsg();
}
spoutMsg.setCommand("ack");
@@ -60,7 +61,7 @@ public class ShellSpout implements ISpout {
}
public void fail(Object msgId) {
- if (spoutMsg == null) {
+ if (spoutMsg == null) {
spoutMsg = new SpoutMsg();
}
spoutMsg.setCommand("fail");
@@ -91,8 +92,7 @@ public class ShellSpout implements ISpout {
_process.writeTaskIds(outtasks);
}
} else {
- _collector.emitDirect((int) task.longValue(), stream,
- tuple, messageId);
+ _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
}
} else {
throw new RuntimeException("Unknown command received: " + command);
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 8efd904..0016d66 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -184,14 +184,17 @@ public class ShellBolt implements IBolt {
}
private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
- List<Tuple> anchors = new ArrayList<Tuple>();
- for (String anchor : shellMsg.getAnchors()) {
- Tuple t = _inputs.get(anchor);
- if (t == null) {
- throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
- }
- anchors.add(t);
- }
+ List<Tuple> anchors = new ArrayList<Tuple>();
+ List<String> recvAnchors = shellMsg.getAnchors();
+ if (recvAnchors != null) {
+ for (String anchor : recvAnchors) {
+ Tuple t = _inputs.get(anchor);
+ if (t == null) {
+ throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
+ }
+ anchors.add(t);
+ }
+ }
if(shellMsg.getTask() == 0) {
List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 8871a77..2c3b3a3 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -19,6 +19,7 @@ import org.apache.log4j.Logger;
public class ShellProcess implements Serializable {
public static Logger LOG = Logger.getLogger(ShellProcess.class);
+ public static Logger ShellLogger;
private Process _subprocess;
private InputStream processErrorStream;
private String[] command;
@@ -32,14 +33,15 @@ public class ShellProcess implements Serializable {
ProcessBuilder builder = new ProcessBuilder(command);
builder.directory(new File(context.getCodeDir()));
+ ShellLogger = Logger.getLogger(context.getThisComponentId());
+
this.serializer = getSerializer(conf);
Number pid;
try {
_subprocess = builder.start();
processErrorStream = _subprocess.getErrorStream();
- serializer.initialize(_subprocess.getOutputStream(),
- _subprocess.getInputStream());
+ serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
pid = serializer.connect(conf, context);
} catch (IOException e) {
throw new RuntimeException(
@@ -83,35 +85,29 @@ public class ShellProcess implements Serializable {
public void writeBoltMsg(BoltMsg msg) throws IOException {
serializer.writeBoltMsg(msg);
- // drain the error stream to avoid dead lock because of full error
- // stream buffer
- drainErrorStream();
+ // Log any info sent on the error stream
+ logErrorStream();
}
public void writeSpoutMsg(SpoutMsg msg) throws IOException {
serializer.writeSpoutMsg(msg);
- // drain the error stream to avoid dead lock because of full error
- // stream buffer
- drainErrorStream();
+ // Log any info sent on the error stream
+ logErrorStream();
}
public void writeTaskIds(List<Integer> taskIds) throws IOException {
serializer.writeTaskIds(taskIds);
- // drain the error stream to avoid dead lock because of full error
- // stream buffer
- drainErrorStream();
+ // Log any info sent on the error stream
+ logErrorStream();
}
- public void drainErrorStream() {
+ public void logErrorStream() {
try {
while (processErrorStream.available() > 0) {
int bufferSize = processErrorStream.available();
byte[] errorReadingBuffer = new byte[bufferSize];
-
processErrorStream.read(errorReadingBuffer, 0, bufferSize);
-
- LOG.info("Got error from shell process: "
- + new String(errorReadingBuffer));
+ ShellLogger.info(new String(errorReadingBuffer));
}
} catch (Exception e) {
}