You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:42 UTC

[17/24] git commit: Added null check for anchors. Log a component's error stream under it name.

Added null check for anchors. Log a component's error stream under it name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8cde5508
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8cde5508
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8cde5508

Branch: refs/heads/master
Commit: 8cde5508b5e4828982aa4f1ff2e70ba6fd10f4b9
Parents: 5ba0938
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Fri Nov 29 14:47:31 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Fri Nov 29 14:47:31 2013 +0200

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    |  8 +++---
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 19 +++++++------
 .../jvm/backtype/storm/utils/ShellProcess.java  | 28 +++++++++-----------
 3 files changed, 27 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 6840938..bd5f84e 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -31,6 +31,7 @@ public class ShellSpout implements ISpout {
     public void open(Map stormConf, TopologyContext context,
                      SpoutOutputCollector collector) {
         _collector = collector;
+
         _process = new ShellProcess(_command);
 
         Number subpid = _process.launch(stormConf, context);
@@ -51,7 +52,7 @@ public class ShellSpout implements ISpout {
     }
 
     public void ack(Object msgId) {
-    	if (spoutMsg == null) {
+        if (spoutMsg == null) {
             spoutMsg = new SpoutMsg();
         }
         spoutMsg.setCommand("ack");
@@ -60,7 +61,7 @@ public class ShellSpout implements ISpout {
     }
 
     public void fail(Object msgId) {
-    	if (spoutMsg == null) {
+        if (spoutMsg == null) {
             spoutMsg = new SpoutMsg();
         }
         spoutMsg.setCommand("fail");
@@ -91,8 +92,7 @@ public class ShellSpout implements ISpout {
                             _process.writeTaskIds(outtasks);
                         }
                     } else {
-                        _collector.emitDirect((int) task.longValue(), stream,
-                                tuple, messageId);
+                        _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
                     }
                 } else {
                     throw new RuntimeException("Unknown command received: " + command);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 8efd904..0016d66 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -184,14 +184,17 @@ public class ShellBolt implements IBolt {
     }
 
     private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
-    	List<Tuple> anchors = new ArrayList<Tuple>();
-    	for (String anchor : shellMsg.getAnchors()) {
-	    	Tuple t = _inputs.get(anchor);
-	        if (t == null) {
-	            throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
-	        }
-	        anchors.add(t);
-    	}
+        List<Tuple> anchors = new ArrayList<Tuple>();
+        List<String> recvAnchors = shellMsg.getAnchors();
+        if (recvAnchors != null) {
+            for (String anchor : recvAnchors) {
+                Tuple t = _inputs.get(anchor);
+                if (t == null) {
+                    throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
+                }
+                anchors.add(t);
+            }
+        }
 
         if(shellMsg.getTask() == 0) {
             List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8cde5508/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 8871a77..2c3b3a3 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -19,6 +19,7 @@ import org.apache.log4j.Logger;
 
 public class ShellProcess implements Serializable {
     public static Logger LOG = Logger.getLogger(ShellProcess.class);
+    public static Logger ShellLogger;
     private Process      _subprocess;
     private InputStream  processErrorStream;
     private String[]     command;
@@ -32,14 +33,15 @@ public class ShellProcess implements Serializable {
         ProcessBuilder builder = new ProcessBuilder(command);
         builder.directory(new File(context.getCodeDir()));
 
+        ShellLogger = Logger.getLogger(context.getThisComponentId());
+
         this.serializer = getSerializer(conf);
 
         Number pid;
         try {
             _subprocess = builder.start();
             processErrorStream = _subprocess.getErrorStream();
-            serializer.initialize(_subprocess.getOutputStream(),
-                    _subprocess.getInputStream());
+            serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
             pid = serializer.connect(conf, context);
         } catch (IOException e) {
             throw new RuntimeException(
@@ -83,35 +85,29 @@ public class ShellProcess implements Serializable {
 
     public void writeBoltMsg(BoltMsg msg) throws IOException {
         serializer.writeBoltMsg(msg);
-        // drain the error stream to avoid dead lock because of full error
-        // stream buffer
-        drainErrorStream();
+        // Log any info sent on the error stream
+        logErrorStream();
     }
 
     public void writeSpoutMsg(SpoutMsg msg) throws IOException {
         serializer.writeSpoutMsg(msg);
-        // drain the error stream to avoid dead lock because of full error
-        // stream buffer
-        drainErrorStream();
+        // Log any info sent on the error stream
+        logErrorStream();
     }
 
     public void writeTaskIds(List<Integer> taskIds) throws IOException {
         serializer.writeTaskIds(taskIds);
-        // drain the error stream to avoid dead lock because of full error
-        // stream buffer
-        drainErrorStream();
+        // Log any info sent on the error stream
+        logErrorStream();
     }
 
-    public void drainErrorStream() {
+    public void logErrorStream() {
         try {
             while (processErrorStream.available() > 0) {
                 int bufferSize = processErrorStream.available();
                 byte[] errorReadingBuffer = new byte[bufferSize];
-
                 processErrorStream.read(errorReadingBuffer, 0, bufferSize);
-
-                LOG.info("Got error from shell process: "
-                        + new String(errorReadingBuffer));
+                ShellLogger.info(new String(errorReadingBuffer));
             }
         } catch (Exception e) {
         }