You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:38 UTC
[07/45] storm git commit: STORM-711: modifying all connectors to use
collector.reportError instead of logging and use tuple anchoring when
emitting a tuple.
STORM-711: modifying all connectors to use collector.reportError instead of logging and use tuple anchoring when emitting a tuple.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ffb1562c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ffb1562c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ffb1562c
Branch: refs/heads/nimbus-ha-branch
Commit: ffb1562cb0eb3fbd916423136fcd3c181fbf6212
Parents: 2666d99
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Mar 18 10:15:53 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Mar 18 10:49:27 2015 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java | 2 +-
.../main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java | 4 ++--
.../java/org/apache/storm/hbase/trident/state/HBaseState.java | 2 +-
.../src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java | 2 +-
.../main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java | 2 +-
5 files changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
index f7f0886..cf29aa5 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java
@@ -62,7 +62,7 @@ public class HBaseBolt extends AbstractHBaseBolt {
try {
this.hBaseClient.batchMutate(mutations);
} catch(Exception e){
- LOG.warn("Failing tuple. Error writing rowKey " + rowKey, e);
+ this.collector.reportError(e);
this.collector.fail(tuple);
return;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
index c6838be..fd32f50 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseLookupBolt.java
@@ -67,11 +67,11 @@ public class HBaseLookupBolt extends AbstractHBaseBolt {
try {
Result result = hBaseClient.batchGet(Lists.newArrayList(get))[0];
for(Values values : rowToTupleMapper.toValues(tuple, result)) {
- this.collector.emit(values);
+ this.collector.emit(tuple, values);
}
this.collector.ack(tuple);
} catch (Exception e) {
- LOG.warn("Could not perform Lookup for rowKey =" + rowKey + " from Hbase.", e);
+ this.collector.reportError(e);
this.collector.fail(tuple);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
----------------------------------------------------------------------
diff --git a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
index 71ab7c4..04518ca 100644
--- a/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
+++ b/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java
@@ -141,7 +141,7 @@ public class HBaseState implements State {
try {
hBaseClient.batchMutate(mutations);
} catch (Exception e) {
- LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+ collector.reportError(e);
throw new FailedException(e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index a416357..dcb09e7 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -112,7 +112,7 @@ public class HdfsBolt extends AbstractHdfsBolt{
this.rotationPolicy.reset();
}
} catch (IOException e) {
- LOG.warn("write/sync failed.", e);
+ this.collector.reportError(e);
this.collector.fail(tuple);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/ffb1562c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index fc9bb4f..baf4df0 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -123,7 +123,7 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
this.rotationPolicy.reset();
}
} catch (IOException e) {
- LOG.warn("write/sync failed.", e);
+ this.collector.reportError(e);
this.collector.fail(tuple);
}