You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/11/15 18:34:45 UTC
[17/40] phoenix git commit: PHOENIX-4335 System catalog snapshot
created each time a new connection is created
PHOENIX-4335 System catalog snapshot created each time a new connection is created
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2dab4f60
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2dab4f60
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2dab4f60
Branch: refs/heads/4.x-HBase-1.2
Commit: 2dab4f606c2eb164c2595ae6acffb5f7d8a5b38d
Parents: 0e97de5
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Oct 31 15:55:03 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Nov 15 10:02:14 2017 -0800
----------------------------------------------------------------------
.../phoenix/end2end/SystemCatalogUpgradeIT.java | 121 +++++++++++++++++++
.../phoenix/coprocessor/MetaDataProtocol.java | 12 +-
.../query/ConnectionQueryServicesImpl.java | 39 ++++--
.../java/org/apache/phoenix/query/BaseTest.java | 35 ++++--
4 files changed, 190 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dab4f60/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
new file mode 100644
index 0000000..e5b1d6e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SystemCatalogUpgradeIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServicesImpl;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesTestImpl;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SystemCatalogUpgradeIT extends BaseTest {
+ private static boolean reinitialize;
+ private static int countUpgradeAttempts;
+ private static long systemTableVersion = MetaDataProtocol.getPriorVersion();
+
+ private static class PhoenixUpgradeCountingServices extends ConnectionQueryServicesImpl {
+ public PhoenixUpgradeCountingServices(QueryServices services, ConnectionInfo connectionInfo, Properties info) {
+ super(services, connectionInfo, info);
+ }
+
+ @Override
+ protected void setUpgradeRequired() {
+ super.setUpgradeRequired();
+ countUpgradeAttempts++;
+ }
+
+ @Override
+ protected long getSystemTableVersion() {
+ return systemTableVersion;
+ }
+
+ @Override
+ protected boolean isInitialized() {
+ return !reinitialize && super.isInitialized();
+ }
+ }
+
+ public static class PhoenixUpgradeCountingDriver extends PhoenixTestDriver {
+ private ConnectionQueryServices cqs;
+ private final ReadOnlyProps overrideProps;
+
+ public PhoenixUpgradeCountingDriver(ReadOnlyProps props) {
+ overrideProps = props;
+ }
+
+ @Override
+ public boolean acceptsURL(String url) throws SQLException {
+ return true;
+ }
+
+ @Override // public for testing
+ public synchronized ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException {
+ if (cqs == null) {
+ cqs = new PhoenixUpgradeCountingServices(new QueryServicesTestImpl(getDefaultProps(), overrideProps), ConnectionInfo.create(url), info);
+ cqs.init(url, info);
+ } else if (reinitialize) {
+ cqs.init(url, info);
+ reinitialize = false;
+ }
+ return cqs;
+ }
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, PhoenixUpgradeCountingDriver.class.getName());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testUpgradeOnlyHappensOnce() throws Exception {
+ ConnectionQueryServices services = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class).getQueryServices();
+ assertTrue(services instanceof PhoenixUpgradeCountingServices);
+ // Check if the timestamp version is changing between the current version and prior version
+ boolean wasTimestampChanged = systemTableVersion != MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+ reinitialize = true;
+ systemTableVersion = MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+ DriverManager.getConnection(getUrl());
+ // Confirm that if the timestamp changed, that an upgrade was performed (and that if it
+ // didn't, that an upgrade wasn't attempted).
+ assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts);
+ // Confirm that another connection does not increase the number of times upgrade was attempted
+ DriverManager.getConnection(getUrl());
+ assertEquals(wasTimestampChanged ? 1 : 0, countUpgradeAttempts);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dab4f60/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 655068d..c4ecc3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.coprocessor;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
@@ -89,7 +90,8 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0 = MIN_TABLE_TIMESTAMP + 27;
- public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_TABLE_TIMESTAMP + 28;
+ // Since there's no upgrade code, keep the version the same as the previous version
+ public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0;
@@ -431,6 +433,14 @@ public abstract class MetaDataProtocol extends MetaDataService {
}
}
+ public static long getPriorVersion() {
+ Iterator<Long> iterator = TIMESTAMP_VERSION_MAP.descendingKeySet().iterator();
+ if (!iterator.hasNext()) {
+ return -1;
+ }
+ return iterator.next();
+ }
+
public static String getVersion(long serverTimestamp) {
/*
* It is possible that when clients are trying to run upgrades concurrently, we could be at an intermediate
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dab4f60/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 03cb7de..532b586 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2353,13 +2353,38 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return addColumn(oldMetaConnection, tableName, timestamp, columns, true);
}
+ // Available for testing
+ protected long getSystemTableVersion() {
+ return MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
+ }
+
+ // Available for testing
+ protected void setUpgradeRequired() {
+ this.upgradeRequired.set(true);
+ }
+
+ // Available for testing
+ protected boolean isInitialized() {
+ return initialized;
+ }
+
+ // Available for testing
+ protected void setInitialized(boolean isInitialized) {
+ initialized = isInitialized;
+ }
+
+ // Available for testing
+ protected String getSystemCatalogDML() {
+ return QueryConstants.CREATE_TABLE_METADATA;
+ }
+
@Override
public void init(final String url, final Properties props) throws SQLException {
try {
PhoenixContextExecutor.call(new Callable<Void>() {
@Override
public Void call() throws Exception {
- if (initialized) {
+ if (isInitialized()) {
if (initializationException != null) {
// Throw previous initialization exception, as we won't resuse this instance
throw initializationException;
@@ -2367,7 +2392,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return null;
}
synchronized (ConnectionQueryServicesImpl.this) {
- if (initialized) {
+ if (isInitialized()) {
if (initializationException != null) {
// Throw previous initialization exception, as we won't resuse this instance
throw initializationException;
@@ -2409,7 +2434,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
Properties scnProps = PropertiesUtil.deepCopy(props);
scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
- Long.toString(MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP));
+ Long.toString(getSystemTableVersion()));
scnProps.remove(PhoenixRuntime.TENANT_ID_ATTRIB);
String globalUrl = JDBCUtil.removeProperty(url, PhoenixRuntime.TENANT_ID_ATTRIB);
try (HBaseAdmin hBaseAdmin = getAdmin();
@@ -2417,7 +2442,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
scnProps, newEmptyMetaData())) {
try {
metaConnection.setRunningUpgrade(true);
- metaConnection.createStatement().executeUpdate(QueryConstants.CREATE_TABLE_METADATA);
+ metaConnection.createStatement().executeUpdate(getSystemCatalogDML());
} catch (NewerTableAlreadyExistsException ignore) {
// Ignore, as this will happen if the SYSTEM.CATALOG already exists at this fixed
// timestamp. A TableAlreadyExistsException is not thrown, since the table only exists
@@ -2425,7 +2450,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (TableAlreadyExistsException e) {
long currentServerSideTableTimeStamp = e.getTable().getTimeStamp();
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP) {
- ConnectionQueryServicesImpl.this.upgradeRequired.set(true);
+ setUpgradeRequired();
}
} catch (PhoenixIOException e) {
if (!Iterables.isEmpty(Iterables.filter(Throwables.getCausalChain(e), AccessDeniedException.class))) {
@@ -2484,7 +2509,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
throw initializationException;
}
} finally {
- initialized = true;
+ setInitialized(true);
}
}
}
@@ -2567,7 +2592,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE);
boolean snapshotCreated = false;
try {
- if (!ConnectionQueryServicesImpl.this.upgradeRequired.get()) {
+ if (!isUpgradeRequired()) {
throw new UpgradeNotRequiredException();
}
Properties scnProps = PropertiesUtil.deepCopy(props);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dab4f60/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 8dd00c9..326efa3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -79,6 +79,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -163,6 +164,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public abstract class BaseTest {
+ public static final String DRIVER_CLASS_NAME_ATTRIB = "phoenix.driver.class.name";
+
private static final Map<String,String> tableDDLMap;
private static final Logger logger = LoggerFactory.getLogger(BaseTest.class);
protected static final int DEFAULT_TXN_TIMEOUT_SECONDS = 30;
@@ -440,7 +443,7 @@ public abstract class BaseTest {
* @return url to be used by clients to connect to the cluster.
* @throws IOException
*/
- protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws IOException {
+ protected static String setUpTestCluster(@Nonnull Configuration conf, ReadOnlyProps overrideProps) throws Exception {
boolean isDistributedCluster = isDistributedClusterModeEnabled(conf);
if (!isDistributedCluster) {
return initMiniCluster(conf, overrideProps);
@@ -538,8 +541,9 @@ public abstract class BaseTest {
* Initialize the mini cluster using phoenix-test specific configuration.
* @param overrideProps TODO
* @return url to be used by clients to connect to the mini cluster.
+ * @throws Exception
*/
- private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) {
+ private static String initMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
setUpConfigForMiniCluster(conf, overrideProps);
utility = new HBaseTestingUtility(conf);
try {
@@ -559,8 +563,9 @@ public abstract class BaseTest {
* Initialize the cluster in distributed mode
* @param overrideProps TODO
* @return url to be used by clients to connect to the mini cluster.
+ * @throws Exception
*/
- private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) {
+ private static String initClusterDistributedMode(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
setTestConfigForDistribuedCluster(conf, overrideProps);
try {
IntegrationTestingUtility util = new IntegrationTestingUtility(conf);
@@ -572,13 +577,13 @@ public abstract class BaseTest {
return JDBC_PROTOCOL + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
}
- private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) {
+ private static void setTestConfigForDistribuedCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
setDefaultTestConfig(conf, overrideProps);
}
- private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) {
+ private static void setDefaultTestConfig(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
ConfigUtil.setReplicationConfigIfAbsent(conf);
- QueryServices services = new PhoenixTestDriver().getQueryServices();
+ QueryServices services = newTestDriver(overrideProps).getQueryServices();
for (Entry<String,String> entry : services.getProps()) {
conf.set(entry.getKey(), entry.getValue());
}
@@ -594,11 +599,11 @@ public abstract class BaseTest {
}
}
- public static Configuration setUpConfigForMiniCluster(Configuration conf) {
+ public static Configuration setUpConfigForMiniCluster(Configuration conf) throws Exception {
return setUpConfigForMiniCluster(conf, ReadOnlyProps.EMPTY_PROPS);
}
- public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) {
+ public static Configuration setUpConfigForMiniCluster(Configuration conf, ReadOnlyProps overrideProps) throws Exception {
assertNotNull(conf);
setDefaultTestConfig(conf, overrideProps);
/*
@@ -625,12 +630,24 @@ public abstract class BaseTest {
return conf;
}
+ private static PhoenixTestDriver newTestDriver(ReadOnlyProps props) throws Exception {
+ PhoenixTestDriver newDriver;
+ String driverClassName = props.get(DRIVER_CLASS_NAME_ATTRIB);
+ if (driverClassName == null) {
+ newDriver = new PhoenixTestDriver(props);
+ } else {
+ Class<?> clazz = Class.forName(driverClassName);
+ Constructor constr = clazz.getConstructor(ReadOnlyProps.class);
+ newDriver = (PhoenixTestDriver)constr.newInstance(props);
+ }
+ return newDriver;
+ }
/**
* Create a {@link PhoenixTestDriver} and register it.
* @return an initialized and registered {@link PhoenixTestDriver}
*/
public static PhoenixTestDriver initAndRegisterTestDriver(String url, ReadOnlyProps props) throws Exception {
- PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
+ PhoenixTestDriver newDriver = newTestDriver(props);
DriverManager.registerDriver(newDriver);
Driver oldDriver = DriverManager.getDriver(url);
if (oldDriver != newDriver) {