You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/05/09 00:42:36 UTC
[4/4] phoenix git commit: PHOENIX-3818 Add client setting to disable
server UPSERT SELECT work
PHOENIX-3818 Add client setting to disable server UPSERT SELECT work
Adds phoenix.client.enable.server.upsert.select property that is true
(enabled) by default. This acts as a feature toggle for PHOENIX-3271.
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ed30d1ff
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ed30d1ff
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ed30d1ff
Branch: refs/heads/4.x-HBase-0.98
Commit: ed30d1ff151eecbd2161d197c3cf7159f6707e6e
Parents: 6befc6c
Author: Alex Araujo <al...@gmail.com>
Authored: Mon May 1 20:27:18 2017 -0500
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon May 8 17:35:03 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/rpc/PhoenixServerRpcIT.java | 93 ++++++++++++++------
.../apache/phoenix/compile/UpsertCompiler.java | 14 ++-
.../UngroupedAggregateRegionObserver.java | 14 +--
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../phoenix/query/QueryServicesOptions.java | 4 +-
.../org/apache/phoenix/util/ExpressionUtil.java | 14 +++
6 files changed, 97 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 8f95b32..6782c3e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -44,11 +44,12 @@ import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -66,14 +67,14 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
Map<String, String> serverProps = Collections.singletonMap(HRegionServer.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
TestPhoenixIndexRpcSchedulerFactory.class.getName());
// use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue
- Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
- RpcControllerFactory.class.getName());
+ Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+ RpcControllerFactory.class.getName());
NUM_SLAVES_BASE = 2;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @AfterClass
- public static void cleanUpAfterTestSuite() throws Exception {
+ @After
+ public void cleanUpAfterTest() throws Exception {
TestPhoenixIndexRpcSchedulerFactory.reset();
}
@@ -90,26 +91,19 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
Connection conn = driver.connect(getUrl(), props);
try {
- // create the table
- conn.createStatement().execute(
- "CREATE TABLE " + dataTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ // create the table
+ createTable(conn, dataTableFullName);
- // create the index
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+ // create the index
+ createIndex(conn, indexName);
ensureTablesOnDifferentRegionServers(dataTableFullName, indexTableFullName);
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
- stmt.setString(1, "k1");
- stmt.setString(2, "v1");
- stmt.setString(3, "v2");
- stmt.execute();
- conn.commit();
+ upsertRow(conn, dataTableFullName);
// run select query that should use the index
String selectSql = "SELECT k, v2 from " + dataTableFullName + " WHERE v1=?";
- stmt = conn.prepareStatement(selectSql);
+ PreparedStatement stmt = conn.prepareStatement(selectSql);
stmt.setString(1, "v1");
// verify that the query does a range scan on the index table
@@ -126,17 +120,11 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
// drop index table
conn.createStatement().execute(
"DROP INDEX " + indexName + " ON " + dataTableFullName );
- // create a data table with the same name as the index table
- conn.createStatement().execute(
- "CREATE TABLE " + indexTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ // create a data table with the same name as the index table
+ createTable(conn, indexTableFullName);
// upsert one row to the table (which has the same table name as the previous index table)
- stmt = conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
- stmt.setString(1, "k1");
- stmt.setString(2, "v1");
- stmt.setString(3, "v2");
- stmt.execute();
- conn.commit();
+ upsertRow(conn, indexTableFullName);
// run select query on the new table
selectSql = "SELECT k, v2 from " + indexTableFullName + " WHERE v1=?";
@@ -154,8 +142,7 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
TestPhoenixIndexRpcSchedulerFactory.reset();
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+ createIndex(conn, indexName + "_1");
// verify that that index queue is used and only once (during Upsert Select on server to build the index)
Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
}
@@ -164,6 +151,54 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test
+ public void testUpsertSelectServerDisabled() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // disable server side upsert select
+ props.setProperty(QueryServices.ENABLE_SERVER_UPSERT_SELECT, "false");
+ try (Connection conn = driver.connect(getUrl(), props)) {
+ // create two tables with identical schemas
+ createTable(conn, dataTableFullName);
+ upsertRow(conn, dataTableFullName);
+ String tableName2 = dataTableFullName + "_2";
+ createTable(conn, tableName2);
+ ensureTablesOnDifferentRegionServers(dataTableFullName, tableName2);
+ // copy the row from the first table using upsert select
+ upsertSelectRows(conn, dataTableFullName, tableName2);
+ Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(),
+ Mockito.never()).dispatch(Mockito.any(CallRunner.class));
+
+ }
+ }
+
+ private void createTable(Connection conn, String tableName) throws SQLException {
+ conn.createStatement().execute(
+ "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ }
+
+ private void createIndex(Connection conn, String indexName) throws SQLException {
+ conn.createStatement().execute(
+ "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+ }
+
+ private void upsertRow(Connection conn, String tableName) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+ stmt.setString(1, "k1");
+ stmt.setString(2, "v1");
+ stmt.setString(3, "v2");
+ stmt.execute();
+ conn.commit();
+ }
+
+ private void upsertSelectRows(Connection conn, String tableName1, String tableName2) throws SQLException {
+ PreparedStatement stmt =
+ conn.prepareStatement(
+ "UPSERT INTO " + tableName2 + " (k, v1, v2) SELECT k, v1, v2 FROM "
+ + tableName1);
+ stmt.execute();
+ conn.commit();
+ }
+
/**
* Verifies that the given tables each have a single region and are on
* different region servers. If they are on the same server moves tableName2
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 2304d83..931513a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -105,6 +105,7 @@ import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.schema.types.PUnsignedLong;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -335,6 +336,9 @@ public class UpsertCompiler {
int nValuesToSet;
boolean sameTable = false;
boolean runOnServer = false;
+ boolean serverUpsertSelectEnabled =
+ services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT,
+ QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT);
UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
// Retry once if auto commit is off, as the meta data may
// be out of date. We do not retry if auto commit is on, as we
@@ -505,7 +509,7 @@ public class UpsertCompiler {
&& tableRefToBe.equals(selectResolver.getTables().get(0));
tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
/* We can run the upsert in a coprocessor if:
- * 1) from has only 1 table
+ * 1) from has only 1 table or server UPSERT SELECT is enabled
* 2) the select query isn't doing aggregation (which requires a client-side final merge)
* 3) autoCommit is on
* 4) the table is not immutable with indexes, as the client is the one that figures out the additional
@@ -523,7 +527,7 @@ public class UpsertCompiler {
// If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
// so we might be able to run it entirely on the server side.
// region space managed by region servers. So we bail out on executing on server side.
- runOnServer = isAutoCommit && !table.isTransactional()
+ runOnServer = (sameTable || serverUpsertSelectEnabled) && isAutoCommit && !table.isTransactional()
&& !(table.isImmutableRows() && !table.getIndexes().isEmpty())
&& !select.isJoin() && table.getRowTimestampColPos() == -1;
}
@@ -666,7 +670,11 @@ public class UpsertCompiler {
reverseColumnIndexes[tempPos] = pos;
reverseColumnIndexes[i] = i;
}
-
+ // If any pk slots are changing and server side UPSERT SELECT is disabled, do not run on server
+ if (!serverUpsertSelectEnabled && ExpressionUtil
+ .isPkPositionChanging(new TableRef(table), projectedExpressions)) {
+ runOnServer = false;
+ }
////////////////////////////////////////////////////////////////////
// UPSERT SELECT run server-side
/////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 92bde94..23b8be0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -112,6 +112,7 @@ import org.apache.phoenix.schema.types.PFloat;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.KeyValueUtil;
import org.apache.phoenix.util.LogUtil;
@@ -397,7 +398,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
values = new byte[projectedTable.getPKColumns().size()][];
areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
region.getTableDesc().getTableName().getName()) == 0
- && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+ && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
} else {
byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -791,17 +792,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
- // If the row ends up living in a different region, we'll get an error otherwise.
- for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
- PColumn column = tableRef.getTable().getPKColumns().get(i);
- Expression source = projectedExpressions.get(i);
- if (source == null || !source
- .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
- }
- return false;
- }
-
private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) {
return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize)
|| (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 6e75370..2627207 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -246,6 +246,9 @@ public interface QueryServices extends SQLCloseable {
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
+ // whether to enable server side RS -> RS calls for upsert select statements
+ public static final String ENABLE_SERVER_UPSERT_SELECT ="phoenix.client.enable.server.upsert.select";
+
/**
* Get executor service used for parallel scans
*/
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0c4ebc0..eef964f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -287,7 +287,9 @@ public class QueryServicesOptions {
// 4.10, psql and CSVBulkLoad
// expects binary data to be base 64
// encoded
-
+ // RS -> RS calls for upsert select statements are enabled by default
+ public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = true;
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ed30d1ff/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index 1fbb534..fbd10fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -10,11 +10,15 @@
package org.apache.phoenix.util;
import java.sql.SQLException;
+import java.util.List;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
@@ -54,4 +58,14 @@ public class ExpressionUtil {
return false;
}
+ public static boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
+ for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
+ PColumn column = tableRef.getTable().getPKColumns().get(i);
+ Expression source = projectedExpressions.get(i);
+ if (source == null || !source
+ .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
+ }
+ return false;
+ }
+
}