You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/12/02 07:16:23 UTC
[1/2] storm git commit: STORM-3292: flush writers in HiveState when
the trident batch commits
Repository: storm
Updated Branches:
refs/heads/1.x-branch 8eac25b34 -> e6828f25f
STORM-3292: flush writers in HiveState when the trident batch commits
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/954ebe2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/954ebe2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/954ebe2c
Branch: refs/heads/1.x-branch
Commit: 954ebe2c059eb0f21888224935fdfe3d7d653f4f
Parents: 8eac25b
Author: Arun Mahadevan <ar...@apache.org>
Authored: Mon Nov 26 18:15:58 2018 -0800
Committer: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Committed: Sun Dec 2 15:53:58 2018 +0900
----------------------------------------------------------------------
.../main/java/org/apache/storm/hive/trident/HiveState.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/954ebe2c/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index 10b3591..a4cefbe 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -71,6 +71,13 @@ public class HiveState implements State {
@Override
public void commit(Long txId) {
+ try {
+ flushAllWriters();
+ currentBatchSize = 0;
+ } catch (HiveWriter.TxnFailure | InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure ex) {
+ LOG.warn("Commit failed. Failing the batch.", ex);
+ throw new FailedException(ex);
+ }
}
public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
[2/2] storm git commit: STORM-3292: log an error message if tick
tuple interval is set in HiveOptions for Trident
Posted by ka...@apache.org.
STORM-3292: log an error message if tick tuple interval is set in HiveOptions for Trident
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6828f25
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6828f25
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6828f25
Branch: refs/heads/1.x-branch
Commit: e6828f25f1eea9bda78131acf12ad448e46bdded
Parents: 954ebe2
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Nov 27 10:30:28 2018 -0800
Committer: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Committed: Sun Dec 2 16:02:56 2018 +0900
----------------------------------------------------------------------
.../main/java/org/apache/storm/hive/common/HiveOptions.java | 2 +-
.../org/apache/storm/hive/trident/HiveStateFactory.java | 9 ++++++++-
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e6828f25/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index e80f5d7..c1b56e8 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -26,7 +26,7 @@ public class HiveOptions implements Serializable {
/**
* Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
*/
- private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+ public static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
protected HiveMapper mapper;
protected String databaseName;
http://git-wip-us.apache.org/repos/asf/storm/blob/e6828f25/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index d6e3c71..66458e1 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -34,7 +34,14 @@ public class HiveStateFactory implements StateFactory {
public HiveStateFactory(){}
- public HiveStateFactory withOptions(HiveOptions options){
+ /**
+ * The options for connecting to Hive.
+ */
+ public HiveStateFactory withOptions(HiveOptions options) {
+ if (options.getTickTupleInterval() != HiveOptions.DEFAULT_TICK_TUPLE_INTERVAL_SECS) {
+ LOG.error("Tick tuple interval will be ignored for trident."
+ + " The Hive writers are flushed after each batch.");
+ }
this.options = options;
return this;
}