You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/12/02 07:16:23 UTC

[1/2] storm git commit: STORM-3292: flush writers in HiveState when the trident batch commits

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 8eac25b34 -> e6828f25f


STORM-3292: flush writers in HiveState when the trident batch commits


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/954ebe2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/954ebe2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/954ebe2c

Branch: refs/heads/1.x-branch
Commit: 954ebe2c059eb0f21888224935fdfe3d7d653f4f
Parents: 8eac25b
Author: Arun Mahadevan <ar...@apache.org>
Authored: Mon Nov 26 18:15:58 2018 -0800
Committer: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Committed: Sun Dec 2 15:53:58 2018 +0900

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hive/trident/HiveState.java    | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/954ebe2c/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
index 10b3591..a4cefbe 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
@@ -71,6 +71,13 @@ public class HiveState implements State {
 
     @Override
     public void commit(Long txId) {
+        try {
+            flushAllWriters();
+            currentBatchSize = 0;
+        } catch (HiveWriter.TxnFailure | InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure ex) {
+            LOG.warn("Commit failed. Failing the batch.", ex);
+            throw new FailedException(ex);
+        }
     }
 
     public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions)  {


[2/2] storm git commit: STORM-3292: log an error message if tick tuple interval is set in HiveOptions for Trident

Posted by ka...@apache.org.
STORM-3292: log an error message if tick tuple interval is set in HiveOptions for Trident


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e6828f25
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e6828f25
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e6828f25

Branch: refs/heads/1.x-branch
Commit: e6828f25f1eea9bda78131acf12ad448e46bdded
Parents: 954ebe2
Author: Arun Mahadevan <ar...@apache.org>
Authored: Tue Nov 27 10:30:28 2018 -0800
Committer: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
Committed: Sun Dec 2 16:02:56 2018 +0900

----------------------------------------------------------------------
 .../main/java/org/apache/storm/hive/common/HiveOptions.java | 2 +-
 .../org/apache/storm/hive/trident/HiveStateFactory.java     | 9 ++++++++-
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e6828f25/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
index e80f5d7..c1b56e8 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
@@ -26,7 +26,7 @@ public class HiveOptions implements Serializable {
     /**
      * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
      */
-    private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
+    public static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
 
     protected HiveMapper mapper;
     protected String databaseName;

http://git-wip-us.apache.org/repos/asf/storm/blob/e6828f25/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
index d6e3c71..66458e1 100644
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
@@ -34,7 +34,14 @@ public class HiveStateFactory implements StateFactory {
 
     public HiveStateFactory(){}
 
-    public HiveStateFactory withOptions(HiveOptions options){
+    /**
+     * The options for connecting to Hive.
+     */
+    public HiveStateFactory withOptions(HiveOptions options) {
+        if (options.getTickTupleInterval() != HiveOptions.DEFAULT_TICK_TUPLE_INTERVAL_SECS) {
+            LOG.error("Tick tuple interval will be ignored for trident."
+                    + " The Hive writers are flushed after each batch.");
+        }
         this.options = options;
         return this;
     }