You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/11/26 04:39:06 UTC
[2/2] phoenix git commit: PHOENIX-1408 Don't disable table before
modifying HTable metadata (Samarth Jain)
PHOENIX-1408 Don't disable table before modifying HTable metadata (Samarth Jain)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c1d6da3f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c1d6da3f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c1d6da3f
Branch: refs/heads/4.0
Commit: c1d6da3fcc387c7428ad4523fe1afd2d8763eddd
Parents: ed4ad13
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Nov 25 17:34:45 2014 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Tue Nov 25 19:38:55 2014 -0800
----------------------------------------------------------------------
.../apache/phoenix/end2end/AlterTableIT.java | 19 +-
.../query/ConnectionQueryServicesImpl.java | 190 ++++++++++++++++---
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../phoenix/query/QueryServicesOptions.java | 28 ++-
4 files changed, 212 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1d6da3f/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index f711bd4..2943fe6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -911,4 +911,21 @@ public class AlterTableIT extends BaseHBaseManagedTimeIT {
ddl = "ALTER TABLE T ADD STRING_ARRAY1 VARCHAR[]";
conn1.createStatement().execute(ddl);
conn1.close();
- }}
+ }
+
+ @Test
+ public void testAddColumnForNewColumnFamily() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String ddl = "CREATE TABLE T (\n"
+ +"ID1 VARCHAR(15) NOT NULL,\n"
+ +"ID2 VARCHAR(15) NOT NULL,\n"
+ +"CREATED_DATE DATE,\n"
+ +"CREATION_TIME BIGINT,\n"
+ +"LAST_USED DATE,\n"
+ +"CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SALT_BUCKETS = 8";
+ Connection conn1 = DriverManager.getConnection(getUrl(), props);
+ conn1.createStatement().execute(ddl);
+ ddl = "ALTER TABLE T ADD CF.STRING VARCHAR";
+ conn1.createStatement().execute(ddl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1d6da3f/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index b73e2dc..18f2f50 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
@@ -141,6 +142,7 @@ import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -476,6 +478,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
latestMetaDataLock.wait(waitTime);
} catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME
}
@@ -675,11 +679,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
try {
admin = new HBaseAdmin(config);
try {
- HTableDescriptor existingDesc = admin.getTableDescriptor(tableName);
- HColumnDescriptor oldDescriptor = existingDesc.getFamily(family.getFirst());
- HColumnDescriptor columnDescriptor = null;
+ HTableDescriptor existingTableDesc = admin.getTableDescriptor(tableName);
+ HColumnDescriptor oldColumnDesc = existingTableDesc.getFamily(family.getFirst());
+ HColumnDescriptor newColumnDesc = null;
- if (oldDescriptor == null) {
+ if (oldColumnDesc == null) {
if (tableType == PTableType.VIEW) {
String fullTableName = Bytes.toString(tableName);
throw new ReadOnlyTableException(
@@ -688,30 +692,30 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SchemaUtil.getTableNameFromFullName(fullTableName),
Bytes.toString(family.getFirst()));
}
- columnDescriptor = generateColumnFamilyDescriptor(family, tableType );
+ newColumnDesc = generateColumnFamilyDescriptor(family, tableType );
} else {
- columnDescriptor = new HColumnDescriptor(oldDescriptor);
+ newColumnDesc = new HColumnDescriptor(oldColumnDesc);
// Don't attempt to make any metadata changes for a VIEW
if (tableType == PTableType.VIEW) {
return;
}
- modifyColumnFamilyDescriptor(columnDescriptor, family);
+ modifyColumnFamilyDescriptor(newColumnDesc, family);
}
- if (columnDescriptor.equals(oldDescriptor)) {
+ if (newColumnDesc.equals(oldColumnDesc)) {
// Table already has family and it's the same.
return;
}
- admin.disableTable(tableName);
- if (oldDescriptor == null) {
- admin.addColumn(tableName, columnDescriptor);
- } else {
- admin.modifyColumn(tableName, columnDescriptor);
- }
- admin.enableTable(tableName);
+ addOrModifyColumnDescriptor(tableName, admin, oldColumnDesc, newColumnDesc);
} catch (org.apache.hadoop.hbase.TableNotFoundException e) {
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.TABLE_UNDEFINED).setRootCause(e).build().buildException();
- }
+ } catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
+ sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+ } catch (TimeoutException e) {
+ sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException();
+ }
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
} finally {
@@ -733,6 +737,117 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
}
+ private void addOrModifyColumnDescriptor(byte[] tableName, HBaseAdmin admin, HColumnDescriptor oldColumnDesc,
+ HColumnDescriptor newColumnDesc) throws IOException, org.apache.hadoop.hbase.TableNotFoundException,
+ InterruptedException, TimeoutException {
+ boolean isOnlineSchemaUpgradeEnabled = ConnectionQueryServicesImpl.this.props.getBoolean(
+ QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE,
+ QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE);
+ if (!isOnlineSchemaUpgradeEnabled) {
+ admin.disableTable(tableName);
+ if (oldColumnDesc == null) {
+ admin.addColumn(tableName, newColumnDesc);
+ } else {
+ admin.modifyColumn(tableName, newColumnDesc);
+ }
+ } else {
+ if (oldColumnDesc == null) {
+ admin.addColumn(tableName, newColumnDesc);
+ } else {
+ admin.modifyColumn(tableName, newColumnDesc);
+ }
+ pollForUpdatedColumnDescriptor(admin, tableName, newColumnDesc);
+ }
+ }
+
+ private static interface RetriableOperation {
+ boolean checkForCompletion() throws TimeoutException, org.apache.hadoop.hbase.TableNotFoundException, IOException;
+ String getOperatioName();
+ }
+
+ private void pollForUpdatedTableDescriptor(final HBaseAdmin admin, final HTableDescriptor newTableDescriptor,
+ final byte[] tableName) throws InterruptedException, TimeoutException {
+ checkAndRetry(new RetriableOperation() {
+
+ @Override
+ public String getOperatioName() {
+ return "UpdateOrNewTableDescriptor";
+ }
+
+ @Override
+ public boolean checkForCompletion() throws TimeoutException,
+ org.apache.hadoop.hbase.TableNotFoundException, IOException {
+ HTableDescriptor tableDesc = admin.getTableDescriptor(tableName);
+ return newTableDescriptor.equals(tableDesc);
+ }
+ });
+ }
+
+ private void pollForUpdatedColumnDescriptor(final HBaseAdmin admin, final byte[] tableName,
+ final HColumnDescriptor columnFamilyDesc) throws InterruptedException, TimeoutException {
+ checkAndRetry(new RetriableOperation() {
+
+ @Override
+ public String getOperatioName() {
+ return "UpdateOrNewColumnDescriptor";
+ }
+
+ @Override
+ public boolean checkForCompletion() throws TimeoutException,
+ org.apache.hadoop.hbase.TableNotFoundException, IOException {
+ HTableDescriptor newTableDesc = admin.getTableDescriptor(tableName);
+ return newTableDesc.getFamilies().contains(columnFamilyDesc);
+ }
+ });
+ }
+
+ private void checkAndRetry(RetriableOperation op) throws InterruptedException, TimeoutException {
+ int maxRetries = ConnectionQueryServicesImpl.this.props.getInt(
+ QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK,
+ QueryServicesOptions.DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK);
+ long sleepInterval = ConnectionQueryServicesImpl.this.props
+ .getLong(QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK,
+ QueryServicesOptions.DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK);
+ boolean success = false;
+ int numTries = 1;
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ do {
+ try {
+ success = op.checkForCompletion();
+ } catch (Exception ex) {
+ // If we encounter any exception on the first or last try, propagate the exception and fail.
+ // Else, we swallow the exception and retry till we reach maxRetries.
+ if (numTries == 1 || numTries == maxRetries) {
+ watch.stop();
+ TimeoutException toThrow = new TimeoutException("Operation " + op.getOperatioName()
+ + " didn't complete because of exception. Time elapsed: " + watch.elapsedMillis());
+ toThrow.initCause(ex);
+ throw toThrow;
+ }
+ }
+ numTries++;
+ Thread.sleep(sleepInterval);
+ } while (numTries < maxRetries && !success);
+
+ watch.stop();
+
+ if (!success) {
+ throw new TimeoutException("Operation " + op.getOperatioName() + " didn't complete within "
+ + watch.elapsedMillis() + " ms "
+ + (numTries > 1 ? ("after retrying " + numTries + (numTries > 1 ? "times." : "time.")) : ""));
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Operation "
+ + op.getOperatioName()
+ + " completed within "
+ + watch.elapsedMillis()
+ + "ms "
+ + (numTries > 1 ? ("after retrying " + numTries + (numTries > 1 ? "times." : "time.")) : ""));
+ }
+ }
+ }
+
/**
*
* @param tableName
@@ -794,16 +909,18 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
return existingDesc;
}
- // TODO: Take advantage of online schema change ability by setting "hbase.online.schema.update.enable" to true
- admin.disableTable(tableName);
- admin.modifyTable(tableName, newDesc);
- admin.enableTable(tableName);
-
+ modifyTable(tableName, admin, newDesc);
return newDesc;
}
} catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
+ } catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
+ sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
+ } catch (TimeoutException e) {
+ sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setRootCause(e.getCause() != null ? e.getCause() : e).build().buildException();
} finally {
try {
if (admin != null) {
@@ -823,6 +940,21 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
}
return null; // will never make it here
}
+
+ private void modifyTable(byte[] tableName, HBaseAdmin admin, HTableDescriptor newDesc) throws IOException,
+ org.apache.hadoop.hbase.TableNotFoundException, InterruptedException, TimeoutException {
+ boolean isOnlineSchemaUpgradeEnabled = ConnectionQueryServicesImpl.this.props.getBoolean(
+ QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE,
+ QueryServicesOptions.DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE);
+ if (!isOnlineSchemaUpgradeEnabled) {
+ admin.disableTable(tableName);
+ admin.modifyTable(tableName, newDesc);
+ admin.enableTable(tableName);
+ } else {
+ admin.modifyTable(tableName, newDesc);
+ pollForUpdatedTableDescriptor(admin, newDesc, tableName);
+ }
+ }
private static boolean isInvalidMutableIndexConfig(Long serverVersion) {
if (serverVersion == null) {
@@ -1671,6 +1803,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (IOException e) {
throw new PhoenixIOException(e);
} catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build()
.buildException();
} finally {
@@ -1858,9 +1992,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SQLException sqlE = null;
try {
resultObjects= hTable.batch(incrementBatch);
- } catch (IOException e){
+ } catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
- } catch (InterruptedException e){
+ } catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
@@ -1980,6 +2116,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
} catch (IOException e){
sqlE = ServerUtil.parseServerException(e);
} catch (InterruptedException e){
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
@@ -2027,9 +2165,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement
SQLException sqlE = null;
try {
hTable.batch(mutations);
- } catch (IOException e){
+ } catch (IOException e) {
sqlE = ServerUtil.parseServerException(e);
- } catch (InterruptedException e){
+ } catch (InterruptedException e) {
+ // restore the interrupt status
+ Thread.currentThread().interrupt();
sqlE = new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION)
.setRootCause(e).build().buildException(); // FIXME ?
} finally {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1d6da3f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ab1a8e5..eb72a83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -141,6 +141,9 @@ public interface QueryServices extends SQLCloseable {
public static final String SEQUENCE_SALT_BUCKETS_ATTRIB = "phoenix.sequence.saltBuckets";
public static final String COPROCESSOR_PRIORITY_ATTRIB = "phoenix.coprocessor.priority";
public static final String EXPLAIN_CHUNK_COUNT_ATTRIB = "phoenix.explain.displayChunkCount";
+ public static final String ALLOW_ONLINE_TABLE_SCHEMA_UPDATE = "hbase.online.schema.update.enable";
+ public static final String NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.retries";
+ public static final String DELAY_FOR_SCHEMA_UPDATE_CHECK = "phoenix.schema.change.delay";
/**
* Get executor service used for parallel scans
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c1d6da3f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 460b199..7cfa3aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -17,9 +17,11 @@
*/
package org.apache.phoenix.query;
+import static org.apache.phoenix.query.QueryServices.ALLOW_ONLINE_TABLE_SCHEMA_UPDATE;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
import static org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.DELAY_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
import static org.apache.phoenix.query.QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.GROUPBY_MAX_CACHE_SIZE_ATTRIB;
@@ -40,6 +42,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATT
import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK;
import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB;
import static org.apache.phoenix.query.QueryServices.REGIONSERVER_LEASE_PERIOD_ATTRIB;
@@ -167,7 +170,10 @@ public class QueryServicesOptions {
*/
public static final int DEFAULT_COPROCESSOR_PRIORITY = Coprocessor.PRIORITY_SYSTEM/2 + Coprocessor.PRIORITY_USER/2; // Divide individually to prevent any overflow
public static final boolean DEFAULT_EXPLAIN_CHUNK_COUNT = true;
-
+ public static final boolean DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE = true;
+ public static final int DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK = 10;
+ public static final long DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK = 5 * 1000; // 5 seconds.
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
@@ -215,6 +221,9 @@ public class QueryServicesOptions {
.setIfUnset(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES)
.setIfUnset(SEQUENCE_CACHE_SIZE_ATTRIB, DEFAULT_SEQUENCE_CACHE_SIZE)
.setIfUnset(SCAN_RESULT_CHUNK_SIZE, DEFAULT_SCAN_RESULT_CHUNK_SIZE)
+ .setIfUnset(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, DEFAULT_ALLOW_ONLINE_TABLE_SCHEMA_UPDATE)
+ .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
+ .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK);
;
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
@@ -413,7 +422,7 @@ public class QueryServicesOptions {
public int getSpillableGroupByNumSpillFiles() {
return config.getInt(GROUPBY_SPILL_FILES_ATTRIB, DEFAULT_GROUPBY_SPILL_FILES);
}
-
+
public QueryServicesOptions setMaxServerCacheTTLMs(int ttl) {
return set(MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, ttl);
}
@@ -468,4 +477,19 @@ public class QueryServicesOptions {
return this;
}
+ public QueryServicesOptions setAllowOnlineSchemaUpdate(boolean allow) {
+ config.setBoolean(ALLOW_ONLINE_TABLE_SCHEMA_UPDATE, allow);
+ return this;
+ }
+
+ public QueryServicesOptions setNumRetriesForSchemaChangeCheck(int numRetries) {
+ config.setInt(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, numRetries);
+ return this;
+ }
+
+ public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) {
+ config.setLong(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, delayInMillis);
+ return this;
+ }
+
}