You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:46 UTC
[28/40] storm git commit: STORM-2527 Initialize
java.sql.DriverManager earlier to avoid deadlock
STORM-2527 Initialize java.sql.DriverManager earlier to avoid deadlock
* also fixed some checkstyle violations
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4a55a7df
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4a55a7df
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4a55a7df
Branch: refs/heads/1.1.x-branch
Commit: 4a55a7dfe8814a00606170c12ef62a60bd9d74ad
Parents: db7bcc3
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue May 23 19:20:15 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:53:03 2017 +0900
----------------------------------------------------------------------
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 68 +++++++++++++++-----
1 file changed, 52 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4a55a7df/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 9cfbb4e..6258a97 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -15,25 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.jdbc.bolt;
-import org.apache.storm.Config;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.base.BaseRichBolt;
+import java.sql.DriverManager;
+import java.util.Map;
import org.apache.commons.lang.Validate;
+import org.apache.storm.Config;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class);
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractJdbcBolt.class);
protected OutputCollector collector;
@@ -42,24 +41,61 @@ public abstract class AbstractJdbcBolt extends BaseTickTupleAwareRichBolt {
protected Integer queryTimeoutSecs;
protected ConnectionProvider connectionProvider;
+ static {
+ /*
+ * Load DriverManager first to avoid any race condition between
+ * DriverManager static initialization block and specific driver class's
+ * static initialization block. e.g. PhoenixDriver
+ *
+ * We should take this workaround since prepare() method is synchronized
+ * but an worker can initialize multiple AbstractJdbcBolt instances and
+ * they would make race condition.
+ *
+ * We just need to ensure that DriverManager class is always initialized
+ * earlier than provider so below line should be called first
+ * than initializing provider.
+ */
+ DriverManager.getDrivers();
+ }
+
+ /**
+ * Subclasses should call this to ensure output collector and connection
+ * provider are set up, and finally jdbcClient is initialized properly.
+ * <p/>
+ * {@inheritDoc}
+ */
@Override
- public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
- this.collector = collector;
+ public void prepare(final Map map, final TopologyContext topologyContext,
+ final OutputCollector outputCollector) {
+ this.collector = outputCollector;
connectionProvider.prepare();
- if(queryTimeoutSecs == null) {
- queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+ if (queryTimeoutSecs == null) {
+ String msgTimeout = map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)
+ .toString();
+ queryTimeoutSecs = Integer.parseInt(msgTimeout);
}
this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
}
- public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
- Validate.notNull(connectionProvider);
- this.connectionProvider = connectionProvider;
+ /**
+ * Constructor.
+ * <p/>
+ * @param connectionProviderParam database connection provider
+ */
+ public AbstractJdbcBolt(final ConnectionProvider connectionProviderParam) {
+ Validate.notNull(connectionProviderParam);
+ this.connectionProvider = connectionProviderParam;
}
+ /**
+ * Cleanup.
+ * <p/>
+ * Subclasses should call this to ensure connection provider can be
+ * also cleaned up.
+ */
@Override
public void cleanup() {
connectionProvider.cleanup();