You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/09 20:58:16 UTC

[08/20] phoenix git commit: PHOENIX-4335 System catalog snapshot created each time a new connection is created

PHOENIX-4335 System catalog snapshot created each time a new connection is created


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2a67137d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2a67137d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2a67137d

Branch: refs/heads/5.x-HBase-2.0
Commit: 2a67137db102af7bf7adf6a42863b110343f64d3
Parents: 79d80f1
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Oct 31 15:55:03 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Thu Nov 9 12:43:41 2017 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/SystemCatalogUpgradeIT.java | 121 +++++++++++++++++++
 .../phoenix/coprocessor/MetaDataProtocol.java   |  12 +-
 .../query/ConnectionQueryServicesImpl.java      |  39 ++++--
 .../java/org/apache/phoenix/query/BaseTest.java |  35 ++++--
 4 files changed, 190 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a67137d/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
new file mode 100644
index 0000000..e5b1d6e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SystemCatalogUpgradeIT extends BaseTest {
+    private static boolean reinitialize;
+    private static int countUpgradeAttempts;
+    private static long systemTableVersion = MetaDataProtocol.getPriorVersion();
+    
+    private static class PhoenixUpgradeCountingServices extends ConnectionQueryServicesImpl {
+        public PhoenixUpgradeCountingServices(QueryServices services, ConnectionInfo connectionInfo, Properties info) {
+            super(services, connectionInfo, info);
+        }
+        
+        @Override
+        protected void setUpgradeRequired() {
+            super.setUpgradeRequired();
+            countUpgradeAttempts++;
+        }
+        
+        @Override
+        protected long getSystemTableVersion() {
+            return systemTableVersion;
+        }
+        
+        @Override
+        protected boolean isInitialized() {
+            return !reinitialize && super.isInitialized();
+        }
+    }
+    
+    public static class PhoenixUpgradeCountingDriver extends PhoenixTestDriver {
+        private ConnectionQueryServices cqs;
+        private final ReadOnlyProps overrideProps;
+        
+        public PhoenixUpgradeCountingDriver(ReadOnlyProps props) {
+            overrideProps = props;
+        }
+        
+        @Override
+        public boolean acceptsURL(String url) throws SQLException {
+            return true;
+        }
+        
+        @Override // public for testing
+        public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+            if (cqs == null) {
+                cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url), info);
+                cqs.init(url, info);
+            } else if (reinitialize) {
+                cqs.init(url, info);
+                reinitialize = false;
+            }
+            return cqs;
+        }
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixUpgradeCountingDriver.class.getName());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testUpgradeOnlyHappensOnce() throws Exception {
+        ConnectionQueryServices services = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class).getQueryServices();
+        assertTrue(services instanceof PhoenixUpgradeCountingServices);
+        // Check if the timestamp version is changing between the current version and prior version
+        boolean wasTimestampChanged = systemTableVersion != MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+        reinitialize = true;
+        systemTableVersion = MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+        DriverManager.getConnection(getUrl());
+        // Confirm that if the timestamp changed, that an upgrade was performed (and that if it
+        // didn't, that an upgrade wasn't attempted).
+        assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts);
+        // Confirm that another connection does not increase the number of times upgrade was attempted
+        DriverManager.getConnection(getUrl());
+        assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a67137d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 655068d..c4ecc3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
@@ -89,7 +90,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0 = MIN_TABLE_TIMESTAMP + 27;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_TABLE_TIMESTAMP + 28;
+    // Since there's no upgrade code, keep the version the same as the previous version
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0;
     
@@ -431,6 +433,14 @@ public abstract class MetaDataProtocol extends MetaDataService {
         }
     }
   
+    public static long getPriorVersion() {
+        Iterator<Long> iterator = TIMESTAMP_VERSION_MAP.descendingKeySet().iterator();
+        if (!iterator.hasNext()) {
+            return -1;
+        }
+        return iterator.next();
+    }
+    
     public static String getVersion(long serverTimestamp) {
         /*
          * It is possible that when clients are trying to run upgrades concurrently, we could be at an intermediate

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a67137d/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 4c60aa9..6e9b40e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2356,13 +2356,38 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
         return addColumn(oldMetaConnection, tableName, timestamp, columns, true);
     }
 
+    // Available for testing
+    protected long getSystemTableVersion() {
+        return MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+    }
+    
+    // Available for testing
+    protected void setUpgradeRequired() {
+        this.upgradeRequired.set(true);
+    }
+    
+    // Available for testing
+    protected boolean isInitialized() {
+        return initialized;
+    }
+    
+    // Available for testing
+    protected void setInitialized(boolean isInitialized) {
+        initialized = isInitialized;
+    }
+
+    // Available for testing
+    protected String getSystemCatalogDML() {
+        return QueryConstants.CREATE_TABLE_METADATA;
+    }
+
     @Override
     public void init(final String url, final Properties props) throws SQLException {
         try {
             PhoenixContextExecutor.call(new Callable<Void>() {
                 @Override
                 public Void call() throws Exception {
-                    if (initialized) {
+                    if (isInitialized()) {
                         if (initializationException != null) {
                             // Throw previous initialization exception, as we won't resuse this instance
                             throw initializationException;
@@ -2370,7 +2395,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                         return null;
                     }
                     synchronized (ConnectionQueryServicesImpl.this) {
-                        if (initialized) {
+                        if (isInitialized()) {
                             if (initializationException != null) {
                                 // Throw previous initialization exception, as we won't resuse this instance
                                 throw initializationException;
@@ -2412,7 +2437,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                             }
                             Properties scnProps = PropertiesUtil.deepCopy(props);
                             scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
-                                    Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+                                    Long.toString(getSystemTableVersion()));
                             scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
                             String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
                             try (HBaseAdmin hBaseAdmin = getAdmin();
@@ -2420,7 +2445,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                          scnProps, newEmptyMetaData())) {
                                 try {
                                     metaConnection.setRunningUpgrade(true);
-                                    metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+                                    metaConnection.createStatement().executeUpdate(getSystemCatalogDML());
                                 } catch (NewerTableAlreadyExistsException ignore) {
                                     // Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
                                     // timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
@@ -2428,7 +2453,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                 } catch (TableAlreadyExistsException e) {
                                     long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
                                     if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
-                                        ConnectionQueryServicesImpl.this.upgradeRequired.set(true);
+                                        setUpgradeRequired();
                                     }
                                 } catch (PhoenixIOException e) {
                                     if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
@@ -2487,7 +2512,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                                         throw initializationException;
                                     }
                                 } finally {
-                                    initialized = true;
+                                    setInitialized(true);
                                 }
                             }
                         }
@@ -2570,7 +2595,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
                 PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
         boolean snapshotCreated = false;
         try {
-            if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+            if (!isUpgradeRequired()) {
                 throw new UpgradeNotRequiredException();
             }
             Properties scnProps = PropertiesUtil.deepCopy(props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2a67137d/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 8dd00c9..326efa3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -79,6 +79,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -163,6 +164,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  */
 
 public abstract class BaseTest {
+    public static final String DRIVER_CLASS_NAME_ATTRIB = "phoenix.driver.class.name";
+    
     private static final Map<String,String> tableDDLMap;
     private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
     protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
@@ -440,7 +443,7 @@ public abstract class BaseTest {
      * @return url to be used by clients to connect to the cluster.
      * @throws IOException 
      */
-    protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws IOException {
+    protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         boolean isDistributedCluster = isDistributedClusterModeEnabled(conf);
         if (!isDistributedCluster) {
             return initMiniCluster(conf, overrideProps);
@@ -538,8 +541,9 @@ public abstract class BaseTest {
      * Initialize the mini cluster using phoenix-test specific configuration.
      * @param overrideProps TODO
      * @return url to be used by clients to connect to the mini cluster.
+     * @throws Exception 
      */
-    private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) {
+    private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         setUpConfigForMiniCluster(conf, overrideProps);
         utility = new HBaseTestingUtility(conf);
         try {
@@ -559,8 +563,9 @@ public abstract class BaseTest {
      * Initialize the cluster in distributed mode
      * @param overrideProps TODO
      * @return url to be used by clients to connect to the mini cluster.
+     * @throws Exception 
      */
-    private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) {
+    private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         setTestConfigForDistribuedCluster(conf, overrideProps);
         try {
             IntegrationTestingUtility util =  new IntegrationTestingUtility(conf);
@@ -572,13 +577,13 @@ public abstract class BaseTest {
         return JDBC_PROTOCOL + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
     }
 
-    private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) {
+    private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         setDefaultTestConfig(conf, overrideProps);
     }
     
-    private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) {
+    private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         ConfigUtil.setReplicationConfigIfAbsent(conf);
-        QueryServices services = new PhoenixTestDriver().getQueryServices();
+        QueryServices services = newTestDriver(overrideProps).getQueryServices();
         for (Entry<String,String> entry : services.getProps()) {
             conf.set(entry.getKey(), entry.getValue());
         }
@@ -594,11 +599,11 @@ public abstract class BaseTest {
         }
     }
     
-    public static Configuration setUpConfigForMiniCluster(Configuration conf) {
+    public static Configuration setUpConfigForMiniCluster(Configuration conf) throws Exception {
         return setUpConfigForMiniCluster(conf, ReadOnlyProps.EMPTY_PROPS);
     }
     
-    public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) {
+    public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
         assertNotNull(conf);
         setDefaultTestConfig(conf, overrideProps);
         /*
@@ -625,12 +630,24 @@ public abstract class BaseTest {
         return conf;
     }
 
+    private static PhoenixTestDriver newTestDriver(ReadOnlyProps props) throws Exception {
+        PhoenixTestDriver newDriver;
+        String driverClassName = props.get(DRIVER_CLASS_NAME_ATTRIB);
+        if (driverClassName == null) {
+            newDriver = new PhoenixTestDriver(props);
+        } else {
+            Class<?> clazz = Class.forName(driverClassName);
+            Constructor constr = clazz.getConstructor(ReadOnlyProps.class);
+            newDriver = (PhoenixTestDriver)constr.newInstance(props);
+        }
+        return newDriver;
+    }
     /**
      * Create a {@link PhoenixTestDriver} and register it.
      * @return an initialized and registered {@link PhoenixTestDriver} 
      */
     public static PhoenixTestDriver initAndRegisterTestDriver(String url, ReadOnlyProps props) throws Exception {
-        PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
+        PhoenixTestDriver newDriver = newTestDriver(props);
         DriverManager.registerDriver(newDriver);
         Driver oldDriver = DriverManager.getDriver(url); 
         if (oldDriver != newDriver) {