You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/07/16 07:50:59 UTC
phoenix git commit: PHOENIX-3994 Addendum - set index rpc controller
factory for transactional indexer
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 b385105ee -> 270ef2153
PHOENIX-3994 Addendum - set index rpc controller factory for transactional indexer
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/270ef215
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/270ef215
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/270ef215
Branch: refs/heads/4.x-HBase-0.98
Commit: 270ef2153eaf052c8607cf0dc3a6ecfc02fbcee4
Parents: b385105
Author: Samarth Jain <sa...@apache.org>
Authored: Sun Jul 16 00:51:00 2017 -0700
Committer: Samarth Jain <sa...@apache.org>
Committed: Sun Jul 16 00:51:00 2017 -0700
----------------------------------------------------------------------
.../index/PhoenixTransactionalIndexer.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/270ef215/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 4758848..36e3de9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -51,11 +52,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.MultiMutation;
import org.apache.phoenix.hbase.index.ValueGetter;
@@ -76,6 +80,7 @@ import org.apache.phoenix.transaction.PhoenixTransactionContext;
import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel;
import org.apache.phoenix.transaction.PhoenixTransactionalTable;
import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -111,11 +116,20 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
String serverName = env.getRegionServerServices().getServerName().getServerName();
codec = new PhoenixIndexCodec();
codec.initialize(env);
-
+ // Clone the config since it is shared
+ Configuration clonedConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
+ /*
+ * Set the rpc controller factory so that the HTables used by IndexWriter would
+ * set the correct priorities on the remote RPC calls.
+ */
+ clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
+ InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+ DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
+ // setup the actual index writer
// setup the actual index writer
// For transactional tables, we keep the index active upon a write failure
// since we have the all versus none behavior for transactions.
- this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), env, serverName + "-tx-index-writer");
+ this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
}
@Override