You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/18 13:53:21 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1653 Allow database tables to be created externally

ARTEMIS-1653 Allow database tables to be created externally

(cherry picked from commit c7651853cdb291dfa3bd2906e1e082fd06cff612)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0da630be
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0da630be
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0da630be

Branch: refs/heads/1.x
Commit: 0da630be500ff90db091f2014cba7e3ac97db38d
Parents: af15fc4
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Apr 3 10:11:04 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 18 09:53:17 2018 -0400

----------------------------------------------------------------------
 .../jdbc/store/drivers/AbstractJDBCDriver.java  |  59 ++++++++--
 .../impl/jdbc/JdbcSharedStateManagerTest.java   | 108 +++++++++++++++++++
 .../core/server/impl/jdbc/TestJDBCDriver.java   |  29 ++++-
 .../jdbc/store/journal/JDBCJournalTest.java     |  53 ++++++---
 4 files changed, 221 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0da630be/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
index b0be3ae..c7a944e 100644
--- a/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
+++ b/artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/drivers/AbstractJDBCDriver.java
@@ -179,25 +179,62 @@ public abstract class AbstractJDBCDriver {
       logger.tracef("Validating if table %s didn't exist before creating", tableName);
       try {
          connection.setAutoCommit(false);
+         final boolean tableExists;
          try (ResultSet rs = connection.getMetaData().getTables(null, null, tableName, null)) {
-            if (rs != null && !rs.next()) {
+            if ((rs == null) || (rs != null && !rs.next())) {
+               tableExists = false;
                if (logger.isTraceEnabled()) {
                   logger.tracef("Table %s did not exist, creating it with SQL=%s", tableName, Arrays.toString(sqls));
                }
-               final SQLWarning sqlWarning = rs.getWarnings();
-               if (sqlWarning != null) {
-                  logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
+               if (rs != null) {
+                  final SQLWarning sqlWarning = rs.getWarnings();
+                  if (sqlWarning != null) {
+                     logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), sqlWarning));
+                  }
                }
             } else {
-               try (Statement statement = connection.createStatement();
-                     ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
-                  if (rs.next() && rs.getInt(1) > 0) {
-                     logger.tracef("Table %s did exist but is not empty. Skipping initialization.", tableName);
+               tableExists = true;
+            }
+         }
+         if (tableExists) {
+            logger.tracef("Validating if the existing table %s is initialized or not", tableName);
+            try (Statement statement = connection.createStatement();
+                 ResultSet cntRs = statement.executeQuery(sqlProvider.getCountJournalRecordsSQL())) {
+               logger.tracef("Validation of the existing table %s initialization is started", tableName);
+               int rows;
+               if (cntRs.next() && (rows = cntRs.getInt(1)) > 0) {
+                  logger.tracef("Table %s did exist but is not empty. Skipping initialization. Found %d rows.", tableName, rows);
+                  if (logger.isDebugEnabled()) {
+                     final long expectedRows = Stream.of(sqls).map(String::toUpperCase).filter(sql -> sql.contains("INSERT INTO")).count();
+                     if (rows < expectedRows) {
+                        logger.debug("Table " + tableName + " was expected to contain " + expectedRows + " rows while it has " + rows + " rows.");
+                     }
+                  }
+                  connection.commit();
+                  return;
+               } else {
+                  sqls = Stream.of(sqls).filter(sql -> {
+                     final String upperCaseSql = sql.toUpperCase();
+                     return !(upperCaseSql.contains("CREATE TABLE") || upperCaseSql.contains("CREATE INDEX"));
+                  }).toArray(String[]::new);
+                  if (sqls.length > 0) {
+                     logger.tracef("Table %s did exist but is empty. Starting initialization.", tableName);
                   } else {
-                     sqls = Arrays.copyOfRange(sqls, 1, sqls.length);
+                     logger.tracef("Table %s did exist but is empty. Initialization completed: no initialization statements left.", tableName);
                   }
                }
+            } catch (SQLException e) {
+               logger.warn(JDBCUtils.appendSQLExceptionDetails(new StringBuilder("Can't verify the initialization of table ").append(tableName).append(" due to:"), e, sqlProvider.getCountJournalRecordsSQL()));
+               try {
+                  connection.rollback();
+               } catch (SQLException rollbackEx) {
+                  logger.debug("Rollback failed while validating initialization of a table", rollbackEx);
+               }
+               connection.setAutoCommit(false);
+               logger.tracef("Table %s seems to exist, but we can't verify the initialization. Keep trying to create and initialize.", tableName);
             }
+         }
+         if (sqls.length > 0) {
             try (Statement statement = connection.createStatement()) {
                for (String sql : sqls) {
                   statement.executeUpdate(sql);
@@ -207,9 +244,9 @@ public abstract class AbstractJDBCDriver {
                   }
                }
             }
-         }
 
-         connection.commit();
+            connection.commit();
+         }
       } catch (SQLException e) {
          final String sqlStatements = Stream.of(sqls).collect(Collectors.joining("\n"));
          logger.error(JDBCUtils.appendSQLExceptionDetails(new StringBuilder(), e, sqlStatements));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0da630be/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
new file mode 100644
index 0000000..e7ac316
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManagerTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server.impl.jdbc;
+
+import java.util.UUID;
+
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
+import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JdbcSharedStateManagerTest extends ActiveMQTestBase {
+
+   private DatabaseStorageConfiguration dbConf;
+   private SQLProvider sqlProvider;
+
+   @Before
+   public void configure() {
+      dbConf = createDefaultDatabaseStorageConfiguration();
+      sqlProvider = JDBCUtils.getSQLProvider(
+         dbConf.getJdbcDriverClassName(),
+         dbConf.getNodeManagerStoreTableName(),
+         SQLProvider.DatabaseStoreType.NODE_MANAGER);
+   }
+
+   private TestJDBCDriver createFakeDriver(boolean initializeTable) {
+      return TestJDBCDriver.usingConnectionUrl(
+         dbConf.getJdbcConnectionUrl(),
+         dbConf.getJdbcDriverClassName(),
+         sqlProvider,
+         initializeTable);
+   }
+
+   private JdbcSharedStateManager createSharedStateManager() {
+      return JdbcSharedStateManager.usingConnectionUrl(
+         UUID.randomUUID().toString(),
+         dbConf.getJdbcLockExpirationMillis(),
+         dbConf.getJdbcMaxAllowedMillisFromDbTime(),
+         dbConf.getJdbcConnectionUrl(),
+         dbConf.getJdbcDriverClassName(),
+         sqlProvider);
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableNotExist() throws Exception {
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      try {
+         sharedStateManager.destroy();
+      } finally {
+         sharedStateManager.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableExistEmpty() throws Exception {
+      final TestJDBCDriver fakeDriver = createFakeDriver(false);
+      fakeDriver.start();
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      sharedStateManager.stop();
+      try {
+         fakeDriver.destroy();
+      } finally {
+         fakeDriver.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartIfTableExistInitialized() throws Exception {
+      final TestJDBCDriver fakeDriver = createFakeDriver(true);
+      fakeDriver.start();
+      final JdbcSharedStateManager sharedStateManager = createSharedStateManager();
+      sharedStateManager.stop();
+      try {
+         fakeDriver.destroy();
+      } finally {
+         fakeDriver.stop();
+      }
+   }
+
+   @Test(timeout = 10000)
+   public void shouldStartTwoIfTableNotExist() throws Exception {
+      final JdbcSharedStateManager liveSharedStateManager = createSharedStateManager();
+      final JdbcSharedStateManager backupSharedStateManager = createSharedStateManager();
+      backupSharedStateManager.stop();
+      try {
+         liveSharedStateManager.destroy();
+      } finally {
+         liveSharedStateManager.stop();
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0da630be/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
index 52b497a..2df6274 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/TestJDBCDriver.java
@@ -20,21 +20,33 @@ import java.sql.SQLException;
 
 import org.apache.activemq.artemis.jdbc.store.drivers.AbstractJDBCDriver;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
+import org.junit.Assert;
 
 public class TestJDBCDriver extends AbstractJDBCDriver {
 
+   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
+                                                   String jdbcDriverClass,
+                                                   SQLProvider provider) {
+      return usingConnectionUrl(jdbcConnectionUrl, jdbcDriverClass, provider, false);
+   }
 
-   public static TestJDBCDriver usingConnectionUrl(
-         String jdbcConnectionUrl,
-         String jdbcDriverClass,
-         SQLProvider provider) {
-      TestJDBCDriver driver = new TestJDBCDriver();
+   public static TestJDBCDriver usingConnectionUrl(String jdbcConnectionUrl,
+                                                   String jdbcDriverClass,
+                                                   SQLProvider provider,
+                                                   boolean initialize) {
+      TestJDBCDriver driver = new TestJDBCDriver(initialize);
       driver.setSqlProvider(provider);
       driver.setJdbcConnectionUrl(jdbcConnectionUrl);
       driver.setJdbcDriverClass(jdbcDriverClass);
       return driver;
    }
 
+   private boolean initialize;
+
+   private TestJDBCDriver(boolean initialize) {
+      this.initialize = initialize;
+   }
+
    @Override
    protected void prepareStatements() throws SQLException {
    }
@@ -43,7 +55,14 @@ public class TestJDBCDriver extends AbstractJDBCDriver {
    protected void createSchema() throws SQLException {
       try {
          connection.createStatement().execute(sqlProvider.createNodeManagerStoreTableSQL());
+         if (initialize) {
+            connection.createStatement().execute(sqlProvider.createNodeIdSQL());
+            connection.createStatement().execute(sqlProvider.createStateSQL());
+            connection.createStatement().execute(sqlProvider.createLiveLockSQL());
+            connection.createStatement().execute(sqlProvider.createBackupLockSQL());
+         }
       } catch (SQLException e) {
+         Assert.fail(e.getMessage());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0da630be/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
index f5cf6f0..765f38c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jdbc/store/journal/JDBCJournalTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.jdbc.store.journal;
 
 import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -26,40 +27,38 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
 import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
+import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
 import org.apache.activemq.artemis.jdbc.store.journal.JDBCJournalImpl;
-import org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider;
 import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ThreadLeakCheckRule;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
-import static org.apache.activemq.artemis.jdbc.store.sql.PropertySQLProvider.Factory.SQLDialect.DERBY;
-
 public class JDBCJournalTest extends ActiveMQTestBase {
 
    @Rule
    public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
 
-   private static final String JOURNAL_TABLE_NAME = "MESSAGE_JOURNAL";
-
-   private static final String DRIVER_CLASS = "org.apache.derby.jdbc.EmbeddedDriver";
-
    private JDBCJournalImpl journal;
 
-   private String jdbcUrl;
-
    private ScheduledExecutorService scheduledExecutorService;
 
    private ExecutorService executorService;
 
+   private SQLProvider sqlProvider;
+
+   private DatabaseStorageConfiguration dbConf;
+
    @After
    @Override
    public void tearDown() throws Exception {
@@ -77,11 +76,14 @@ public class JDBCJournalTest extends ActiveMQTestBase {
 
    @Before
    public void setup() throws Exception {
+      dbConf = createDefaultDatabaseStorageConfiguration();
+      sqlProvider = JDBCUtils.getSQLProvider(
+         dbConf.getJdbcDriverClassName(),
+         dbConf.getMessageTableName(),
+         SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL);
       scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
       executorService = Executors.newSingleThreadExecutor();
-      jdbcUrl = "jdbc:derby:target/data;create=true";
-      SQLProvider.Factory factory = new PropertySQLProvider.Factory(DERBY);
-      journal = new JDBCJournalImpl(jdbcUrl, DRIVER_CLASS, factory.create(JOURNAL_TABLE_NAME, SQLProvider.DatabaseStoreType.MESSAGE_JOURNAL), scheduledExecutorService, executorService, new IOCriticalErrorListener() {
+      journal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(), dbConf.getJdbcDriverClassName(), sqlProvider, scheduledExecutorService, executorService, new IOCriticalErrorListener() {
          @Override
          public void onIOException(Throwable code, String message, SequentialFile file) {
 
@@ -91,6 +93,33 @@ public class JDBCJournalTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testRestartEmptyJournal() throws SQLException {
+      Assert.assertTrue(journal.isStarted());
+      Assert.assertEquals(0, journal.getNumberOfRecords());
+      journal.stop();
+      journal.start();
+      Assert.assertTrue(journal.isStarted());
+   }
+
+   @Test
+   public void testConcurrentEmptyJournal() throws SQLException {
+      Assert.assertTrue(journal.isStarted());
+      Assert.assertEquals(0, journal.getNumberOfRecords());
+      final JDBCJournalImpl secondJournal = new JDBCJournalImpl(dbConf.getJdbcConnectionUrl(),
+                                                                          dbConf.getJdbcDriverClassName(),
+                                                                          sqlProvider, scheduledExecutorService,
+                                                                          executorService, (code, message, file) -> {
+         Assert.fail(message);
+      });
+      secondJournal.start();
+      try {
+         Assert.assertTrue(secondJournal.isStarted());
+      } finally {
+         secondJournal.stop();
+      }
+   }
+
+   @Test
    public void testInsertRecords() throws Exception {
       int noRecords = 10;
       for (int i = 0; i < noRecords; i++) {