You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ap...@apache.org on 2017/05/09 00:42:35 UTC

[3/4] phoenix git commit: PHOENIX-3818 Add client setting to disable server UPSERT SELECT work

PHOENIX-3818 Add client setting to disable server UPSERT SELECT work

Adds phoenix.client.enable.server.upsert.select property that is true
(enabled) by default. This acts as a feature toggle for PHOENIX-3271.

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b5312b4b
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b5312b4b
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b5312b4b

Branch: refs/heads/4.x-HBase-1.1
Commit: b5312b4bf718baf3a0025955793b4fdcb38e0774
Parents: 97ceda3
Author: Alex Araujo <al...@gmail.com>
Authored: Mon May 1 20:27:18 2017 -0500
Committer: Andrew Purtell <ap...@apache.org>
Committed: Mon May 8 17:34:59 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/rpc/PhoenixServerRpcIT.java  | 93 ++++++++++++++------
 .../apache/phoenix/compile/UpsertCompiler.java  | 14 ++-
 .../UngroupedAggregateRegionObserver.java       | 14 +--
 .../org/apache/phoenix/query/QueryServices.java |  3 +
 .../phoenix/query/QueryServicesOptions.java     |  4 +-
 .../org/apache/phoenix/util/ExpressionUtil.java | 14 +++
 6 files changed, 97 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 410f02c..b9e4fff 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -45,11 +45,12 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -67,14 +68,14 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
     	Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 
         		TestPhoenixIndexRpcSchedulerFactory.class.getName());
         // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue  
-    	Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 
-    			RpcControllerFactory.class.getName());      
+    	Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+    			RpcControllerFactory.class.getName());
         NUM_SLAVES_BASE = 2;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
     
-    @AfterClass
-    public static void cleanUpAfterTestSuite() throws Exception {
+    @After
+    public void cleanUpAfterTest() throws Exception {
         TestPhoenixIndexRpcSchedulerFactory.reset();
     }
     
@@ -91,26 +92,19 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = driver.connect(getUrl(), props);
         try {
-            // create the table 
-            conn.createStatement().execute(
-                    "CREATE TABLE " + dataTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            // create the table
+            createTable(conn, dataTableFullName);
     
-            // create the index 
-            conn.createStatement().execute(
-                    "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+            // create the index
+            createIndex(conn, indexName);
 
             ensureTablesOnDifferentRegionServers(dataTableFullName, indexTableFullName);
     
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
-            stmt.setString(1, "k1");
-            stmt.setString(2, "v1");
-            stmt.setString(3, "v2");
-            stmt.execute();
-            conn.commit();
+            upsertRow(conn, dataTableFullName);
     
             // run select query that should use the index
             String selectSql = "SELECT k, v2 from " + dataTableFullName + " WHERE v1=?";
-            stmt = conn.prepareStatement(selectSql);
+            PreparedStatement stmt = conn.prepareStatement(selectSql);
             stmt.setString(1, "v1");
     
             // verify that the query does a range scan on the index table
@@ -127,17 +121,11 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             // drop index table 
             conn.createStatement().execute(
                     "DROP INDEX " + indexName + " ON " + dataTableFullName );
-            // create a data table with the same name as the index table 
-            conn.createStatement().execute(
-                    "CREATE TABLE " + indexTableFullName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            // create a data table with the same name as the index table
+            createTable(conn, indexTableFullName);
             
             // upsert one row to the table (which has the same table name as the previous index table)
-            stmt = conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
-            stmt.setString(1, "k1");
-            stmt.setString(2, "v1");
-            stmt.setString(3, "v2");
-            stmt.execute();
-            conn.commit();
+            upsertRow(conn, indexTableFullName);
             
             // run select query on the new table
             selectSql = "SELECT k, v2 from " + indexTableFullName + " WHERE v1=?";
@@ -155,8 +143,7 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
             
             TestPhoenixIndexRpcSchedulerFactory.reset();
-            conn.createStatement().execute(
-                    "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+            createIndex(conn, indexName + "_1");
             // verify that that index queue is used and only once (during Upsert Select on server to build the index)
             Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
         }
@@ -165,6 +152,54 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    @Test
+    public void testUpsertSelectServerDisabled() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        // disable server side upsert select
+        props.setProperty(QueryServices.ENABLE_SERVER_UPSERT_SELECT, "false");
+        try (Connection conn = driver.connect(getUrl(), props)) {
+            // create two tables with identical schemas
+            createTable(conn, dataTableFullName);
+            upsertRow(conn, dataTableFullName);
+            String tableName2 = dataTableFullName + "_2";
+            createTable(conn, tableName2);
+            ensureTablesOnDifferentRegionServers(dataTableFullName, tableName2);
+            // copy the row from the first table using upsert select
+            upsertSelectRows(conn, dataTableFullName, tableName2);
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor(),
+                    Mockito.never()).dispatch(Mockito.any(CallRunner.class));
+
+        }
+    }
+
+    private void createTable(Connection conn, String tableName) throws SQLException {
+        conn.createStatement().execute(
+                "CREATE TABLE " + tableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+    }
+
+    private void createIndex(Connection conn, String indexName) throws SQLException {
+        conn.createStatement().execute(
+                "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+    }
+
+    private void upsertRow(Connection conn, String tableName) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+        stmt.setString(1, "k1");
+        stmt.setString(2, "v1");
+        stmt.setString(3, "v2");
+        stmt.execute();
+        conn.commit();
+    }
+
+    private void upsertSelectRows(Connection conn, String tableName1, String tableName2) throws SQLException {
+        PreparedStatement stmt =
+                conn.prepareStatement(
+                        "UPSERT INTO " + tableName2 + " (k, v1, v2) SELECT k, v1, v2 FROM "
+                                + tableName1);
+        stmt.execute();
+        conn.commit();
+    }
+
 	/**
 	 * Verifies that the given tables each have a single region and are on
 	 * different region servers. If they are on the same server moves tableName2

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5559ad7..bbbd483 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -105,6 +105,7 @@ import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PUnsignedLong;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -335,6 +336,9 @@ public class UpsertCompiler {
         int nValuesToSet;
         boolean sameTable = false;
         boolean runOnServer = false;
+        boolean serverUpsertSelectEnabled =
+                services.getProps().getBoolean(QueryServices.ENABLE_SERVER_UPSERT_SELECT,
+                        QueryServicesOptions.DEFAULT_ENABLE_SERVER_UPSERT_SELECT);
         UpsertingParallelIteratorFactory parallelIteratorFactoryToBe = null;
         // Retry once if auto commit is off, as the meta data may
         // be out of date. We do not retry if auto commit is on, as we
@@ -505,7 +509,7 @@ public class UpsertCompiler {
                         && tableRefToBe.equals(selectResolver.getTables().get(0));
                     tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
                     /* We can run the upsert in a coprocessor if:
-                     * 1) from has only 1 table
+                     * 1) from has only 1 table or server UPSERT SELECT is enabled
                      * 2) the select query isn't doing aggregation (which requires a client-side final merge)
                      * 3) autoCommit is on
                      * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
@@ -523,7 +527,7 @@ public class UpsertCompiler {
                         // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
                         // so we might be able to run it entirely on the server side.
                         // region space managed by region servers. So we bail out on executing on server side.
-                        runOnServer = isAutoCommit && !table.isTransactional()
+                        runOnServer = (sameTable || serverUpsertSelectEnabled) && isAutoCommit && !table.isTransactional()
                                 && !(table.isImmutableRows() && !table.getIndexes().isEmpty())
                                 && !select.isJoin() && table.getRowTimestampColPos() == -1;
                     }
@@ -666,7 +670,11 @@ public class UpsertCompiler {
                     reverseColumnIndexes[tempPos] = pos;
                     reverseColumnIndexes[i] = i;
                 }
-
+                // If any pk slots are changing and server side UPSERT SELECT is disabled, do not run on server
+                if (!serverUpsertSelectEnabled && ExpressionUtil
+                        .isPkPositionChanging(new TableRef(table), projectedExpressions)) {
+                    runOnServer = false;
+                }
                 ////////////////////////////////////////////////////////////////////
                 // UPSERT SELECT run server-side
                 /////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 2dec235..49ef884 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -112,6 +112,7 @@ import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -398,7 +399,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             values = new byte[projectedTable.getPKColumns().size()][];
             areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
                     region.getTableDesc().getTableName().getName()) == 0
-                    && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+                    && !ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
             
         } else {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -792,17 +793,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
-        // If the row ends up living in a different region, we'll get an error otherwise.
-        for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
-            PColumn column = tableRef.getTable().getPKColumns().get(i);
-            Expression source = projectedExpressions.get(i);
-            if (source == null || !source
-                    .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
-        }
-        return false;
-    }
-
     private boolean readyToCommit(MutationList mutations, int maxBatchSize, long maxBatchSizeBytes) {
         return !mutations.isEmpty() && (maxBatchSize > 0 && mutations.size() > maxBatchSize)
                 || (maxBatchSizeBytes > 0 && mutations.heapSize() > maxBatchSizeBytes);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 0b7b737..c01d11f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -248,6 +248,9 @@ public interface QueryServices extends SQLCloseable {
     public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.immutable.storage.scheme";
     public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB  = "phoenix.default.multitenant.immutable.storage.scheme";
 
+    // whether to enable server side RS -> RS calls for upsert select statements
+    public static final String ENABLE_SERVER_UPSERT_SELECT ="phoenix.client.enable.server.upsert.select";
+
     /**
      * Get executor service used for parallel scans
      */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 4fd1344..1ddf7eb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -290,7 +290,9 @@ public class QueryServicesOptions {
                                                                                     // 4.10, psql and CSVBulkLoad
                                                                                     // expects binary data to be base 64
                                                                                     // encoded
-    
+    // RS -> RS calls for upsert select statements are enabled by default
+    public static final boolean DEFAULT_ENABLE_SERVER_UPSERT_SELECT = true;
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b5312b4b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index 1fbb534..fbd10fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -10,11 +10,15 @@
 package org.apache.phoenix.util;
 
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
 
@@ -54,4 +58,14 @@ public class ExpressionUtil {
         return false;
     }
 
+    public static boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
+        for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
+            PColumn column = tableRef.getTable().getPKColumns().get(i);
+            Expression source = projectedExpressions.get(i);
+            if (source == null || !source
+                    .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
+        }
+        return false;
+    }
+
 }