You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by jy...@apache.org on 2014/10/06 20:56:42 UTC
git commit: PHOENIX-1107 Support mutable indexes over replication
Repository: phoenix
Updated Branches:
refs/heads/4.0 e9094d0a4 -> 763f10f00
PHOENIX-1107 Support mutable indexes over replication
Adding test to ensure that we still have indexes working over replication, rather than
just relying on the fact that it 'just works'.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/763f10f0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/763f10f0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/763f10f0
Branch: refs/heads/4.0
Commit: 763f10f00ff5f26c1a2df9b19f430253ee331d90
Parents: e9094d0
Author: Jesse Yates <jy...@apache.org>
Authored: Mon Oct 6 11:50:47 2014 -0700
Committer: Jesse Yates <jy...@apache.org>
Committed: Mon Oct 6 11:55:11 2014 -0700
----------------------------------------------------------------------
.../index/MutableIndexReplicationIT.java | 280 +++++++++++++++++++
1 file changed, 280 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/763f10f0/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
new file mode 100644
index 0000000..9981ed8
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.*;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test that we correctly replicate indexes over replication
+ * <p>
+ * Code for setUp/teardown copied from org.apache.hadoop.hbase.replication.TestReplicationBase in
+ * HBase 0.98.5
+ * </p>
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class MutableIndexReplicationIT extends BaseTest {
+
+ private static final Log LOG = LogFactory.getLog(MutableIndexReplicationIT.class);
+
+ public static final String SCHEMA_NAME = "";
+ public static final String DATA_TABLE_NAME = "T";
+ public static final String INDEX_TABLE_NAME = "I";
+ public static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
+ public static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
+ private static final long REPLICATION_WAIT_TIME_MILLIS = 10000;
+
+ protected static PhoenixTestDriver driver;
+ private static String URL;
+
+ protected static Configuration conf1 = HBaseConfiguration.create();
+ protected static Configuration conf2;
+
+ protected static ZooKeeperWatcher zkw1;
+ protected static ZooKeeperWatcher zkw2;
+
+ protected static ReplicationAdmin admin;
+
+ protected static HBaseTestingUtility utility1;
+ protected static HBaseTestingUtility utility2;
+ protected static final int REPLICATION_RETRIES = 100;
+
+ protected static final byte[] tableName = Bytes.toBytes("test");
+ protected static final byte[] row = Bytes.toBytes("row");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupConfigsAndStartCluster();
+ setupDriver();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ utility2.shutdownMiniCluster();
+ utility1.shutdownMiniCluster();
+ }
+
+ private static void setupConfigsAndStartCluster() throws Exception {
+ // cluster-1 lives at regular HBase home, so we don't need to change how phoenix handles
+ // lookups
+// conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ // smaller log roll size to trigger more events
+ conf1.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+ conf1.setInt("replication.source.size.capacity", 10240);
+ conf1.setLong("replication.source.sleepforretries", 100);
+ conf1.setInt("hbase.regionserver.maxlogs", 10);
+ conf1.setLong("hbase.master.logcleaner.ttl", 10);
+ conf1.setInt("zookeeper.recovery.retry", 1);
+ conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+ conf1.setBoolean("dfs.support.append", true);
+ conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf1.setInt("replication.stats.thread.period.seconds", 5);
+ conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+ // add the phoenix-specific configs
+ BaseTest.setUpConfigForMiniCluster(conf1);
+
+ utility1 = new HBaseTestingUtility(conf1);
+ utility1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = utility1.getZkCluster();
+ // Have to reset conf1 in case zk cluster location different
+ // than default
+ conf1 = utility1.getConfiguration();
+ zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null, true);
+ admin = new ReplicationAdmin(conf1);
+ LOG.info("Setup first Zk");
+
+ // Base conf2 on conf1 so it gets the right zk cluster, and general cluster configs
+ conf2 = HBaseConfiguration.create(conf1);
+ conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
+ conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+ conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+ conf2.setBoolean("dfs.support.append", true);
+ conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+
+ utility2 = new HBaseTestingUtility(conf2);
+ utility2.setZkCluster(miniZK);
+ zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
+
+ //replicate from cluster 1 -> cluster 2, but not back again
+ admin.addPeer("1", utility2.getClusterKey());
+
+ LOG.info("Setup second Zk");
+ utility1.startMiniCluster(2);
+ utility2.startMiniCluster(2);
+ }
+
+ private static void setupDriver() throws Exception {
+ LOG.info("Setting up phoenix driver");
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
+ // Forces server cache to be used
+ props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ // Must update config before starting server
+ URL = getLocalClusterUrl(utility1);
+ LOG.info("Connecting driver to "+URL);
+ setUpTestDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ protected static void setUpTestDriver(String url, ReadOnlyProps props) throws Exception {
+ if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+ if (driver == null) {
+ driver = initAndRegisterDriver(url, props);
+ }
+ }
+ }
+
+ @Test
+ public void testReplicationWithMutableIndexes() throws Exception {
+ Connection conn = getConnection();
+
+ //create the primary and index tables
+ conn.createStatement().execute(
+ "CREATE TABLE " + DATA_TABLE_FULL_NAME
+ + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ conn.createStatement().execute(
+ "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME
+ + " (v1)");
+
+ // make sure that the tables are empty, but reachable
+ String query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
+ ResultSet rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ //make sure there is no data in the table
+ query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+ rs = conn.createStatement().executeQuery(query);
+ assertFalse(rs.next());
+
+ // make sure the data tables are created on the remote cluster
+ HBaseAdmin admin = utility1.getHBaseAdmin();
+ HBaseAdmin admin2 = utility2.getHBaseAdmin();
+
+ List<String> dataTables = new ArrayList<String>();
+ dataTables.add(DATA_TABLE_FULL_NAME);
+ dataTables.add(INDEX_TABLE_FULL_NAME);
+ for (String tableName : dataTables) {
+ HTableDescriptor desc = admin.getTableDescriptor(TableName.valueOf(tableName));
+
+ //create it as-is on the remote cluster
+ admin2.createTable(desc);
+
+ LOG.info("Enabling replication on source table: "+tableName);
+ HColumnDescriptor[] cols = desc.getColumnFamilies();
+ assertEquals(1, cols.length);
+ // add the replication scope to the column
+ HColumnDescriptor col = desc.removeFamily(cols[0].getName());
+ col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ desc.addFamily(col);
+ //disable/modify/enable table so it has replication enabled
+ admin.disableTable(desc.getTableName());
+ admin.modifyTable(tableName, desc);
+ admin.enableTable(desc.getTableName());
+ LOG.info("Replication enabled on source table: "+tableName);
+ }
+
+
+ // load some data into the source cluster table
+ PreparedStatement stmt =
+ conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+ stmt.setString(1, "a"); // k
+ stmt.setString(2, "x"); // v1 <- has index
+ stmt.setString(3, "1"); // v2
+ stmt.execute();
+ conn.commit();
+
+ // make sure the index is working as expected
+ query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("x", rs.getString(1));
+ assertFalse(rs.next());
+ conn.close();
+
+ /*
+ Validate that we have replicated the rows to the remote cluster
+ */
+
+ // other table can't be reached through Phoenix right now - would need to change how we
+ // lookup tables. For right now, we just go through an HTable
+ LOG.info("Looking up tables in replication target");
+ TableName[] tables = admin2.listTableNames();
+ HTable remoteTable = new HTable(utility2.getConfiguration(), tables[0]);
+ for (int i = 0; i < REPLICATION_RETRIES; i++) {
+ if (i >= REPLICATION_RETRIES - 1) {
+ fail("Waited too much time for put replication on table " + remoteTable
+ .getTableDescriptor().getNameAsString());
+ }
+ if (ensureAnyRows(remoteTable)) {
+ break;
+ }
+ LOG.info("Sleeping for " + REPLICATION_WAIT_TIME_MILLIS
+ + " for edits to get replicated");
+ Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
+ }
+ remoteTable.close();
+ }
+
+ private boolean ensureAnyRows(HTable remoteTable) throws IOException {
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ ResultScanner scanner = remoteTable.getScanner(scan);
+ boolean found = false;
+ for (Result r : scanner) {
+ LOG.info("got row: " + r);
+ found = true;
+ }
+ scanner.close();
+ return found;
+ }
+
+ private static Connection getConnection() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ return DriverManager.getConnection(URL, props);
+ }
+}
\ No newline at end of file