You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:46 UTC

[28/40] storm git commit: STORM-2527 Initialize java.sql.DriverManager earlier to avoid deadlock

STORM-2527 Initialize java.sql.DriverManager earlier to avoid deadlock

* also fixed some checkstyle violations


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a55a7df
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a55a7df
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a55a7df

Branch: refs/heads/1.1.x-branch
Commit: 4a55a7dfe8814a00606170c12ef62a60bd9d74ad
Parents: db7bcc3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue May 23 19:20:15 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:53:03 2017 +0900

----------------------------------------------------------------------
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       | 68 +++++++++++++++-----
 1 file changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4a55a7df/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 9cfbb4e..6258a97 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -15,25 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.jdbc.bolt;
 
-import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
+import java.sql.DriverManager;
+import java.util.Map;
 import org.apache.commons.lang.Validate;
+import org.apache.storm.Config;
 import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class);
+    private static final Logger LOG = LoggerFactory.getLogger(
+            AbstractJdbcBolt.class);
 
     protected OutputCollector collector;
 
@@ -42,24 +41,61 @@ public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt {
     protected Integer queryTimeoutSecs;
     protected ConnectionProvider connectionProvider;
 
+    static {
+        /*
+         * Load DriverManager first to avoid any race condition between
+         * DriverManager static initialization block and specific driver class's
+         * static initialization block. e.g. PhoenixDriver
+         *
+         * We should take this workaround since prepare() method is synchronized
+         * but an worker can initialize multiple AbstractJdbcBolt instances and
+         * they would make race condition.
+         *
+         * We just need to ensure that DriverManager class is always initialized
+         * earlier than provider so below line should be called first
+         * than initializing provider.
+         */
+        DriverManager.getDrivers();
+    }
+
+    /**
+     * Subclasses should call this to ensure output collector and connection
+     * provider are set up, and finally jdbcClient is initialized properly.
+     * <p/>
+     * {@inheritDoc}
+     */
     @Override
-    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
-        this.collector = collector;
+    public void prepare(final Map map, final TopologyContext topologyContext,
+                        final OutputCollector outputCollector) {
+        this.collector = outputCollector;
 
         connectionProvider.prepare();
 
-        if(queryTimeoutSecs == null) {
-            queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+        if (queryTimeoutSecs == null) {
+            String msgTimeout = map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)
+                    .toString();
+            queryTimeoutSecs = Integer.parseInt(msgTimeout);
         }
 
         this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
     }
 
-    public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
-        Validate.notNull(connectionProvider);
-        this.connectionProvider = connectionProvider;
+    /**
+     * Constructor.
+     * <p/>
+     * @param connectionProviderParam database connection provider
+     */
+    public AbstractJdbcBolt(final ConnectionProvider connectionProviderParam) {
+        Validate.notNull(connectionProviderParam);
+        this.connectionProvider = connectionProviderParam;
     }
 
+    /**
+     * Cleanup.
+     * <p/>
+     * Subclasses should call this to ensure connection provider can be
+     * also cleaned up.
+     */
     @Override
     public void cleanup() {
         connectionProvider.cleanup();