You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:38 UTC

[07/45] storm git commit: STORM-711: modifying all connectors to use collector.reportError instead of logging and use tuple anchoring when emitting a tuple.

STORM-711: modifying all connectors to use collector.reportError instead of logging and use tuple anchoring when emitting a tuple.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffb1562c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffb1562c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffb1562c

Branch: refs/heads/nimbus-ha-branch
Commit: ffb1562cb0eb3fbd916423136fcd3c181fbf6212
Parents: 2666d99
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Mar 18 10:15:53 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Mar 18 10:49:27 2015 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java     | 2 +-
 .../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java   | 4 ++--
 .../java/org/apache/storm/hbase/trident/state/HBaseState.java    | 2 +-
 .../src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java       | 2 +-
 .../main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java   | 2 +-
 5 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index f7f0886..cf29aa5 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -62,7 +62,7 @@ public class HBaseBolt  extends AbstractHBaseBolt {
         try {
             this.hBaseClient.batchMutate(mutations);
         } catch(Exception e){
-            LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
+            this.collector.reportError(e);
             this.collector.fail(tuple);
             return;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index c6838be..fd32f50 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -67,11 +67,11 @@ public class HBaseLookupBolt extends AbstractHBaseBolt {
         try {
             Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
             for(Values values : rowToTupleMapper.toValues(tuple, result)) {
-                this.collector.emit(values);
+                this.collector.emit(tuple, values);
             }
             this.collector.ack(tuple);
         } catch (Exception e) {
-            LOG.warn("Could not perform Lookup for rowKey =" + rowKey + " from Hbase.", e);
+            this.collector.reportError(e);
             this.collector.fail(tuple);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 71ab7c4..04518ca 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -141,7 +141,7 @@ public class HBaseState implements State {
         try {
             hBaseClient.batchMutate(mutations);
         } catch (Exception e) {
-            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            collector.reportError(e);
             throw new FailedException(e);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index a416357..dcb09e7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -112,7 +112,7 @@ public class HdfsBolt extends AbstractHdfsBolt{
                 this.rotationPolicy.reset();
             }
         } catch (IOException e) {
-            LOG.warn("write/sync failed.", e);
+            this.collector.reportError(e);
             this.collector.fail(tuple);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index fc9bb4f..baf4df0 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -123,7 +123,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
                 this.rotationPolicy.reset();
             }
         } catch (IOException e) {
-            LOG.warn("write/sync failed.", e);
+            this.collector.reportError(e);
             this.collector.fail(tuple);
         }