You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/04/16 16:31:52 UTC

[11/50] [abbrv] phoenix git commit: PHOENIX-1795 Set handlerCount, numQueues and maxQueueLength of index and metadata queues correctly

PHOENIX-1795 Set handlerCount, numQueues and maxQueueLength of index and metadata queues correctly


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e2cf44c3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e2cf44c3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e2cf44c3

Branch: refs/heads/calcite
Commit: e2cf44c3c22f8789c4bd1fe529f07f2d6e45e482
Parents: d05d7c8
Author: Thomas <td...@salesforce.com>
Authored: Mon Mar 30 15:21:44 2015 -0700
Committer: Thomas <td...@salesforce.com>
Committed: Tue Mar 31 13:34:17 2015 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/QueryDatabaseMetaDataIT.java   |  1 +
 .../org/apache/phoenix/rpc/PhoenixClientRpcIT.java | 17 ++++-------------
 .../org/apache/phoenix/rpc/PhoenixServerRpcIT.java | 15 ++++++---------
 .../hadoop/hbase/ipc/PhoenixRpcScheduler.java      | 16 +++++++++++-----
 .../org/apache/phoenix/query/QueryServices.java    |  4 ++++
 .../apache/phoenix/query/QueryServicesOptions.java | 10 ++++++++--
 .../org/apache/phoenix/jdbc/PhoenixTestDriver.java |  5 ++---
 .../java/org/apache/phoenix/query/BaseTest.java    | 10 ++++++++++
 8 files changed, 46 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
index 44086d7..c9ec0ce 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryDatabaseMetaDataIT.java
@@ -682,6 +682,7 @@ public class QueryDatabaseMetaDataIT extends BaseClientManagedTimeIT {
             descriptor.addFamily(columnDescriptor);
         }
         admin.createTable(descriptor);
+        admin.close();
             
         long ts = nextTimestamp();
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
index deb14db..0c61b55 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixClientRpcIT.java
@@ -17,13 +17,11 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.hadoop.hbase.ipc.CallRunner;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -35,8 +33,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Maps;
-
 public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
 
     private static final String SCHEMA_NAME = "S";
@@ -45,15 +41,10 @@ public class PhoenixClientRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
 
     @BeforeClass
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
-        serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-                TestPhoenixIndexRpcSchedulerFactory.class.getName());
-        serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
+        Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 
+        		TestPhoenixIndexRpcSchedulerFactory.class.getName());
         NUM_SLAVES_BASE = 2;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet()
-                .iterator()));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
     }
     
     @AfterClass

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index b04f636..dbcd7ac 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.ipc.CallRunner;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
 import org.apache.hadoop.hbase.master.AssignmentManager;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -54,8 +54,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Maps;
-
 public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
 
     private static final String SCHEMA_NAME = "S";
@@ -65,12 +63,11 @@ public class PhoenixServerRpcIT extends BaseOwnClusterHBaseManagedTimeIT {
     
     @BeforeClass
     public static void doSetup() throws Exception {
-        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
-        serverProps.put(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
-                TestPhoenixIndexRpcSchedulerFactory.class.getName());
-        serverProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ServerRpcControllerFactory.class.getName());
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
-        clientProps.put(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, RpcControllerFactory.class.getName());
+    	Map<String, String> serverProps = Collections.singletonMap(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, 
+        		TestPhoenixIndexRpcSchedulerFactory.class.getName());
+        // use the standard rpc controller for client rpc, so that we can isolate server rpc and ensure they use the correct queue  
+    	Map<String, String> clientProps = Collections.singletonMap(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, 
+    			RpcControllerFactory.class.getName());      
         NUM_SLAVES_BASE = 2;
         setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
index e721271..362e2cc 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -43,15 +45,19 @@ public class PhoenixRpcScheduler extends RpcScheduler {
 
     public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority) {
         // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4
-        int maxQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+    	int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
+    	int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT);
+        int maxIndexQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+        int maxMetadataQueueLength =  conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
         float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0);
-        int numQueues = Math.max(1, Math.round(callQueuesHandlersFactor));
+        int numIndexQueues = Math.max(1, Math.round(indexHandlerCount * callQueuesHandlersFactor));
+        int numMetadataQueues = Math.max(1, Math.round(metadataHandlerCount * callQueuesHandlersFactor));
 
         this.indexPriority = indexPriority;
         this.metadataPriority = metadataPriority;
         this.delegate = delegate;
-        this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", 1, numQueues, maxQueueLength);
-        this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", 1, numQueues, maxQueueLength);
+        this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, numIndexQueues, maxIndexQueueLength);
+        this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, numMetadataQueues, maxMetadataQueueLength);
     }
 
     @Override
@@ -120,4 +126,4 @@ public class PhoenixRpcScheduler extends RpcScheduler {
     }
     
     
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 65f6acf..7a911e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -152,6 +152,10 @@ public interface QueryServices extends SQLCloseable {
     public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells";
     public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls";
     public static final String METRICS_ENABLED = "phoenix.query.metrics.enabled";
+    
+    // rpc queue configs
+    public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count";
+    public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count";
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 97040d2..3561663 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -67,8 +67,10 @@ import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.trace.util.Tracing;
@@ -147,6 +149,8 @@ public class QueryServicesOptions {
     public static final int DEFAULT_INDEX_PRIORITY = 1000;
     public static final int DEFAULT_METADATA_PRIORITY = 2000;
     public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true;
+    public static final int DEFAULT_INDEX_HANDLER_COUNT = 30;
+    public static final int DEFAULT_METADATA_HANDLER_COUNT = 30;
 
     public static final int DEFAULT_TRACING_PAGE_SIZE = 100;
     /**
@@ -185,6 +189,8 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_AUTO_COMMIT = false;
     public static final boolean DEFAULT_IS_METRICS_ENABLED = true;
     
+    private static final String DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY = ClientRpcControllerFactory.class.getName();
+    
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {
@@ -237,7 +243,7 @@ public class QueryServicesOptions {
             .setIfUnset(NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_RETRIES_FOR_SCHEMA_UPDATE_CHECK)
             .setIfUnset(DELAY_FOR_SCHEMA_UPDATE_CHECK, DEFAULT_DELAY_FOR_SCHEMA_UPDATE_CHECK)
             .setIfUnset(METRICS_ENABLED, DEFAULT_IS_METRICS_ENABLED)
-            .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, ClientRpcControllerFactory.class.getName());
+            .setIfUnset(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_CLIENT_RPC_CONTROLLER_FACTORY)
             ;
         // HBase sets this to 1, so we reset it to something more appropriate.
         // Hopefully HBase will change this, because we can't know if a user set
@@ -508,8 +514,8 @@ public class QueryServicesOptions {
     public QueryServicesOptions setDelayInMillisForSchemaChangeCheck(long delayInMillis) {
         config.setLong(DELAY_FOR_SCHEMA_UPDATE_CHECK, delayInMillis);
         return this;
-    }
     
+    }
     public QueryServicesOptions setMetricsEnabled(boolean flag) {
         config.setBoolean(METRICS_ENABLED, flag);
         return this;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
index 0d3c461..d4956ee 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/jdbc/PhoenixTestDriver.java
@@ -55,14 +55,13 @@ public class PhoenixTestDriver extends PhoenixEmbeddedDriver {
     private boolean closed = false;
 
     public PhoenixTestDriver() {
-        this.overrideProps = ReadOnlyProps.EMPTY_PROPS;
-        queryServices = new QueryServicesTestImpl(getDefaultProps());
+        this(ReadOnlyProps.EMPTY_PROPS);
     }
 
     // For tests to override the default configuration
     public PhoenixTestDriver(ReadOnlyProps props) {
         overrideProps = props;
-        queryServices = new QueryServicesTestImpl(getDefaultProps(),overrideProps);
+        queryServices = new QueryServicesTestImpl(getDefaultProps(), overrideProps);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e2cf44c3/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 748ad19..e5884c3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -118,7 +118,12 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.coprocessor.RegionServerObserver;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.ServerRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.LocalIndexMerger;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
@@ -458,6 +463,8 @@ public abstract class BaseTest {
     
     private static final String ORG_ID = "00D300000000XHP";
     protected static int NUM_SLAVES_BASE = 1;
+    private static final String DEFAULT_SERVER_RPC_CONTROLLER_FACTORY = ServerRpcControllerFactory.class.getName();
+    private static final String DEFAULT_RPC_SCHEDULER_FACTORY = PhoenixRpcSchedulerFactory.class.getName();
     
     protected static String getZKClientPort(Configuration conf) {
         return conf.get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
@@ -613,6 +620,9 @@ public abstract class BaseTest {
         }
         //no point doing sanity checks when running tests.
         conf.setBoolean("hbase.table.sanity.checks", false);
+        // set the server rpc controller and rpc scheduler factory, used to configure the cluster
+        conf.set(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY, DEFAULT_SERVER_RPC_CONTROLLER_FACTORY);
+        conf.set(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS, DEFAULT_RPC_SCHEDULER_FACTORY);
         
         // override any defaults based on overrideProps
         for (Entry<String,String> entry : overrideProps) {