You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/24 22:13:25 UTC
[07/50] [abbrv] phoenix git commit: PHOENIX-2605 Enhance IndexToolIT
to test transactional tables
PHOENIX-2605 Enhance IndexToolIT to test transactional tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b0122a54
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b0122a54
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b0122a54
Branch: refs/heads/calcite
Commit: b0122a541325fd7e40e62e3602eb0ad748b94a4f
Parents: e4d569c
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jan 29 14:10:11 2016 -0800
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Mon Feb 8 11:38:47 2016 -0800
----------------------------------------------------------------------
.../phoenix/end2end/ContextClassloaderIT.java | 2 +-
.../phoenix/end2end/CsvBulkLoadToolIT.java | 87 ++----
.../org/apache/phoenix/end2end/IndexToolIT.java | 273 +++++++------------
.../phoenix/end2end/MutableIndexToolIT.java | 128 +++++++++
.../phoenix/end2end/UserDefinedFunctionsIT.java | 8 +-
.../end2end/index/DropIndexDuringUpsertIT.java | 2 +-
.../index/MutableIndexReplicationIT.java | 2 +-
.../example/EndToEndCoveredIndexingIT.java | 5 +-
.../coprocessor/BaseScannerRegionObserver.java | 7 +
.../phoenix/mapreduce/PhoenixInputFormat.java | 11 +-
.../phoenix/mapreduce/index/IndexTool.java | 9 +-
.../index/PhoenixIndexImportDirectMapper.java | 7 +-
.../index/PhoenixIndexImportMapper.java | 6 +-
.../util/PhoenixConfigurationUtil.java | 6 +-
.../apache/phoenix/schema/MetaDataClient.java | 1 -
.../java/org/apache/phoenix/query/BaseTest.java | 56 +++-
16 files changed, 337 insertions(+), 273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
index 7d0e1da..4c67b32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
@@ -62,7 +62,7 @@ public class ContextClassloaderIT extends BaseTest {
String clientPort = hbaseTestUtil.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
String url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
- driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+ driver = initAndRegisterTestDriver(url, ReadOnlyProps.EMPTY_PROPS);
Connection conn = DriverManager.getConnection(url);
Statement stmt = conn.createStatement();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 6bc03bf..26ec889 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -30,65 +29,37 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.AfterClass;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-@Category(NeedsOwnMiniClusterTest.class)
-public class CsvBulkLoadToolIT {
+import com.google.common.collect.Maps;
+
+public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
- // We use HBaseTestUtil because we need to start up a MapReduce cluster as well
- private static HBaseTestingUtility hbaseTestUtil;
- private static String zkQuorum;
private static Connection conn;
+ private static String zkQuorum;
@BeforeClass
- public static void setUp() throws Exception {
- hbaseTestUtil = new HBaseTestingUtility();
- Configuration conf = hbaseTestUtil.getConfiguration();
- setUpConfigForMiniCluster(conf);
- // Since we're using the real PhoenixDriver in this test, remove the
- // extra JDBC argument that causes the test driver to be used.
- conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- hbaseTestUtil.startMiniCluster();
- hbaseTestUtil.startMiniMapReduceCluster();
-
- Class.forName(PhoenixDriver.class.getName());
- DriverManager.registerDriver(PhoenixDriver.INSTANCE);
- zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
- conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
- + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- try {
- if (conn != null) conn.close();
- } finally {
- try {
- DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
- } finally {
- try {
- hbaseTestUtil.shutdownMiniMapReduceCluster();
- } finally {
- hbaseTestUtil.shutdownMiniCluster();
- }
- }
- }
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+ setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+ conn = DriverManager.getConnection(getUrl());
}
@Test
@@ -97,7 +68,7 @@ public class CsvBulkLoadToolIT {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1,Name 1,1970/01/01");
@@ -105,7 +76,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input1.csv",
@@ -136,7 +107,7 @@ public class CsvBulkLoadToolIT {
stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
"NAME VARCHAR, NAMES VARCHAR ARRAY)");
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1|Name 1a;Name 1b");
@@ -144,7 +115,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input2.csv",
"--table", "table2",
@@ -173,7 +144,7 @@ public class CsvBulkLoadToolIT {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1,Name 1,1970/01/01");
@@ -184,7 +155,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+ csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input1.csv,/tmp/input2.csv",
@@ -218,7 +189,7 @@ public class CsvBulkLoadToolIT {
+ " INCLUDE (LAST_NAME)";
stmt.execute(ddl);
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1,FirstName 1,LastName 1");
@@ -226,7 +197,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input3.csv",
"--table", "table3",
@@ -254,7 +225,7 @@ public class CsvBulkLoadToolIT {
ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
stmt.execute(ddl);
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1,FirstName 1,LastName 1");
@@ -262,7 +233,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
try {
csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input3.csv",
@@ -296,7 +267,7 @@ public class CsvBulkLoadToolIT {
+ tableName + "(FIRST_NAME ASC)";
stmt.execute(ddl);
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("1,FirstName 1,LastName 1");
@@ -304,7 +275,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
int exitCode = csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input4.csv",
"--table", tableName,
@@ -326,7 +297,7 @@ public class CsvBulkLoadToolIT {
public void testInvalidArguments() {
String tableName = "TABLE8";
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
try {
csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input4.csv",
@@ -348,7 +319,7 @@ public class CsvBulkLoadToolIT {
stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
- FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
fs.create(new Path(outputPath));
FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
PrintWriter printWriter = new PrintWriter(outputStream);
@@ -357,7 +328,7 @@ public class CsvBulkLoadToolIT {
printWriter.close();
CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
- csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+ csvBulkLoadTool.setConf(getUtility().getConfiguration());
csvBulkLoadTool.run(new String[] {
"--input", "/tmp/input9.csv",
"--output", outputPath,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 062b303..aba9c11 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,9 +17,9 @@
*/
package org.apache.phoenix.end2end;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -28,117 +28,114 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* Tests for the {@link IndexTool}
*/
-@Category(NeedsOwnMiniClusterTest.class)
-public class IndexToolIT {
+@RunWith(Parameterized.class)
+public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
- private static HBaseTestingUtility hbaseTestUtil;
- private static String zkQuorum;
-
- @BeforeClass
- public static void setUp() throws Exception {
- hbaseTestUtil = new HBaseTestingUtility();
- Configuration conf = hbaseTestUtil.getConfiguration();
- conf.setBoolean("hbase.defaults.for.version.skip", true);
- // Since we're using the real PhoenixDriver in this test, remove the
- // extra JDBC argument that causes the test driver to be used.
- conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- setUpConfigForMiniCluster(conf);
- hbaseTestUtil.startMiniCluster();
- hbaseTestUtil.startMiniMapReduceCluster();
- Class.forName(PhoenixDriver.class.getName());
- DriverManager.registerDriver(PhoenixDriver.INSTANCE);
- zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
- }
-
- @Test
- public void testImmutableGlobalIndex() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
- }
-
- @Test
- public void testImmutableLocalIndex() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
- }
-
- @Test
- public void testMutableGlobalIndex() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
- }
+ private final String schemaName;
+ private final String dataTable;
- @Test
- public void testMutableLocalIndex() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
- }
+ private final boolean localIndex;
+ private final boolean transactional;
+ private final boolean directApi;
+ private final String tableDDLOptions;
- @Test
- public void testImmutableGlobalIndexDirectApi() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
+ public IndexToolIT(boolean transactional, boolean localIndex, boolean mutable, boolean directApi) {
+ this.schemaName = "S";
+ this.dataTable = "T" + (transactional ? "_TXN" : "");
+ this.localIndex = localIndex;
+ this.transactional = transactional;
+ this.directApi = directApi;
+ StringBuilder optionBuilder = new StringBuilder();
+ if (!mutable)
+ optionBuilder.append(" IMMUTABLE_ROWS=true ");
+ if (transactional) {
+ if (!(optionBuilder.length()==0))
+ optionBuilder.append(",");
+ optionBuilder.append(" TRANSACTIONAL=true ");
+ }
+ this.tableDDLOptions = optionBuilder.toString();
}
- @Test
- public void testImmutableLocalIndexDirectApi() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+ setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @Test
- public void testMutableGlobalIndexDirectApi() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
+ @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi = {3}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] {
+ { false, false, false, false }, { false, false, false, true }, { false, false, true, false }, { false, false, true, true },
+ { false, true, false, false }, { false, true, false, true }, { false, true, true, false }, { false, true, true, true },
+ { true, false, false, false }, { true, false, false, true }, { true, false, true, false }, { true, false, true, true },
+ { true, true, false, false }, { true, true, false, true }, { true, true, true, false }, { true, true, true, true }
+ });
}
@Test
- public void testMutableLocalIndexDirectApi() throws Exception {
- testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
- }
-
- public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
- testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
- }
-
- public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
-
- final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
- final String indxTable = String.format("%s_%s",dataTable,"INDX");
+ public void testSecondaryIndex() throws Exception {
+ final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
+ final String indxTable = String.format("%s_%s", dataTable, "INDX");
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+ props.setProperty(QueryServices.TRANSACTIONS_ENABLED, "true");
+ Connection conn = DriverManager.getConnection(getUrl(), props);
Statement stmt = conn.createStatement();
try {
- stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+ stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
- int id = 1;
// insert two rows
- upsertRow(stmt1, id++);
- upsertRow(stmt1, id++);
+ upsertRow(stmt1, 1);
+ upsertRow(stmt1, 2);
conn.commit();
- stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
+ if (transactional) {
+ // insert two rows in another connection without committing so that they are not visible to other transactions
+ try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+ PreparedStatement stmt2 = conn.prepareStatement(upsertQuery);
+ upsertRow(stmt2, 5);
+ upsertRow(stmt2, 6);
+ ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) from "+fullTableName);
+ assertTrue(rs.next());
+ assertEquals("Unexpected row count ", 4, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ stmt.execute(String.format("CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
//verify rows are fetched from data table.
String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName);
@@ -153,10 +150,12 @@ public class IndexToolIT {
assertEquals("xxUNAME1_xyz", rs.getString(1));
assertTrue(rs.next());
assertEquals("xxUNAME2_xyz", rs.getString(1));
+ assertFalse(rs.next());
+ conn.commit();
//run the index MR job.
final IndexTool indexingTool = new IndexTool();
- indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+ indexingTool.setConf(new Configuration(getUtility().getConfiguration()));
final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
int status = indexingTool.run(cmdArgs);
@@ -167,115 +166,39 @@ public class IndexToolIT {
upsertRow(stmt1, 4);
conn.commit();
- rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
+ rs = stmt1.executeQuery("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM "+fullTableName);
//assert we are pulling from index table.
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
actualExplainPlan = QueryUtil.getExplainPlan(rs);
- assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
+ assertExplainPlan(actualExplainPlan, schemaName, dataTable, indxTable, localIndex);
rs = stmt.executeQuery(selectSql);
-// assertTrue(rs.next());
-// assertEquals("xxUNAME1_xyz", rs.getString(1));
-// assertEquals(1, rs.getInt(2));
-//
-// assertTrue(rs.next());
-// assertEquals("xxUNAME2_xyz", rs.getString(1));
-// assertEquals(2, rs.getInt(2));
-//
-// assertTrue(rs.next());
-// assertEquals("xxUNAME3_xyz", rs.getString(1));
-// assertEquals(3, rs.getInt(2));
-//
-// assertTrue(rs.next());
-// assertEquals("xxUNAME4_xyz", rs.getString(1));
-// assertEquals(4, rs.getInt(2));
-//
-// assertFalse(rs.next());
-
- conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , fullTableName));
- } finally {
- conn.close();
- }
- }
-
-
- /**
- * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
- * the MR job runs, do show up in the index table .
- * @throws Exception
- */
- @Test
- public void testMutalbleIndexWithUpdates() throws Exception {
-
- final String dataTable = "DATA_TABLE5";
- final String indxTable = String.format("%s_%s",dataTable,"INDX");
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
- Statement stmt = conn.createStatement();
- try {
-
- stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
- int id = 1;
- // insert two rows
- upsertRow(stmt1, id++);
- upsertRow(stmt1, id++);
- conn.commit();
-
- stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
-
- //update a row
- stmt1.setInt(1, 1);
- stmt1.setString(2, "uname" + String.valueOf(10));
- stmt1.setInt(3, 95050 + 1);
- stmt1.executeUpdate();
- conn.commit();
-
- //verify rows are fetched from data table.
- String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
- ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
- String actualExplainPlan = QueryUtil.getExplainPlan(rs);
-
- //assert we are pulling from data table.
- assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+ assertTrue(rs.next());
+ assertEquals("xxUNAME1_xyz", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
- rs = stmt1.executeQuery(selectSql);
assertTrue(rs.next());
- assertEquals("UNAME10", rs.getString(1));
+ assertEquals("xxUNAME2_xyz", rs.getString(1));
+ assertEquals(2, rs.getInt(2));
+
assertTrue(rs.next());
- assertEquals("UNAME2", rs.getString(1));
-
- //run the index MR job.
- final IndexTool indexingTool = new IndexTool();
- indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-
- final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
- int status = indexingTool.run(cmdArgs);
- assertEquals(0, status);
-
- //assert we are pulling from index table.
- rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
- actualExplainPlan = QueryUtil.getExplainPlan(rs);
- assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
+ assertEquals("xxUNAME3_xyz", rs.getString(1));
+ assertEquals(3, rs.getInt(2));
- rs = stmt.executeQuery(selectSql);
assertTrue(rs.next());
- assertEquals("UNAME10", rs.getString(1));
- assertEquals(1, rs.getInt(2));
+ assertEquals("xxUNAME4_xyz", rs.getString(1));
+ assertEquals(4, rs.getInt(2));
+
+ assertFalse(rs.next());
- assertTrue(rs.next());
- assertEquals("UNAME2", rs.getString(1));
- assertEquals(2, rs.getInt(2));
- conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable));
+ conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , fullTableName));
} finally {
conn.close();
}
}
- private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
+ public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
String indxTable, boolean isLocal) {
String expectedExplainPlan = "";
@@ -290,11 +213,11 @@ public class IndexToolIT {
assertEquals(expectedExplainPlan,actualExplainPlan);
}
- private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
+ public static String[] getArgValues(String schemaName, String dataTable, String indxTable) {
return getArgValues(schemaName, dataTable, indxTable, false);
}
- private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
+ public static String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
final List<String> args = Lists.newArrayList();
if (schemaName!=null) {
args.add("-s");
@@ -315,24 +238,12 @@ public class IndexToolIT {
return args.toArray(new String[0]);
}
- private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+ public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
// insert row
stmt.setInt(1, i);
stmt.setString(2, "uname" + String.valueOf(i));
stmt.setInt(3, 95050 + i);
stmt.executeUpdate();
}
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- try {
- DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
- } finally {
- try {
- hbaseTestUtil.shutdownMiniMapReduceCluster();
- } finally {
- hbaseTestUtil.shutdownMiniCluster();
- }
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
new file mode 100644
index 0000000..0791479
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class MutableIndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
+ }
+
+ /**
+ * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
+ * the MR job runs, do show up in the index table .
+ * @throws Exception
+ */
+ @Test
+ public void testMutableIndexWithUpdates() throws Exception {
+
+ final String dataTable = "DATA_TABLE5";
+ final String indxTable = String.format("%s_%s",dataTable,"INDX");
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement();
+ try {
+
+ stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ int id = 1;
+ // insert two rows
+ IndexToolIT.upsertRow(stmt1, id++);
+ IndexToolIT.upsertRow(stmt1, id++);
+ conn.commit();
+
+ stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
+
+ //update a row
+ stmt1.setInt(1, 1);
+ stmt1.setString(2, "uname" + String.valueOf(10));
+ stmt1.setInt(3, 95050 + 1);
+ stmt1.executeUpdate();
+ conn.commit();
+
+ //verify rows are fetched from data table.
+ String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+ //assert we are pulling from data table.
+ assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+
+ rs = stmt1.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME10", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+
+ //run the index MR job.
+ final IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(new Configuration(getUtility().getConfiguration()));
+
+ final String[] cmdArgs = IndexToolIT.getArgValues(null, dataTable,indxTable);
+ int status = indexingTool.run(cmdArgs);
+ assertEquals(0, status);
+
+ //assert we are pulling from index table.
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ IndexToolIT.assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
+
+ rs = stmt.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME10", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
+
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+ assertEquals(2, rs.getInt(2));
+ conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable));
+ } finally {
+ conn.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
index 4d1c5e4..4eee422 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
@@ -30,12 +30,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -54,11 +51,8 @@ import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.io.IOUtils;
import org.apache.phoenix.expression.function.UDFExpression;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.QueryServices;
@@ -259,7 +253,7 @@ public class UserDefinedFunctionsIT extends BaseOwnClusterIT{
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
props.put(QueryServices.DYNAMIC_JARS_DIR_KEY,string+"/hbase/tmpjars/");
- driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1);
compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2);
compileTestClass(MY_ARRAY_INDEX_CLASS_NAME, MY_ARRAY_INDEX_PROGRAM, 3);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
index df28e65..acc089b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
@@ -91,7 +91,7 @@ public abstract class DropIndexDuringUpsertIT extends BaseTest {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
// Must update config before starting server
props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
- driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+ driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
}
@After
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
index a4f4682..2568566 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
@@ -180,7 +180,7 @@ public class MutableIndexReplicationIT extends BaseTest {
// Must update config before starting server
URL = getLocalClusterUrl(utility1);
LOG.info("Connecting driver to "+URL);
- driver = initAndRegisterDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
+ driver = initAndRegisterTestDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
}
@Test
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
index 01812f3..5102dc8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
@@ -17,8 +17,7 @@
*/
package org.apache.phoenix.hbase.index.covered.example;
-
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+import static org.apache.phoenix.query.BaseTest.initAndRegisterTestDriver;
import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import java.io.IOException;
@@ -123,7 +122,7 @@ public class EndToEndCoveredIndexingIT {
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum +
PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR +
conf.get(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
- initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+ initAndRegisterTestDriver(url, ReadOnlyProps.EMPTY_PROPS);
}
@BeforeClass
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index c95cd5d..a363459 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -95,6 +96,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER";
public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync";
public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
+ public static final String TX_SCN = "_TxScn";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -154,6 +156,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s) throws IOException {
+ byte[] txnScn = scan.getAttribute(TX_SCN);
+ if (txnScn!=null) {
+ TimeRange timeRange = scan.getTimeRange();
+ scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+ }
if (isRegionObserverFor(scan)) {
if (! skipRegionBoundaryCheck(scan)) {
throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 8ee1634..5882c14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -37,11 +37,13 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
import com.google.common.base.Preconditions;
@@ -104,11 +106,13 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
throws IOException {
Preconditions.checkNotNull(context);
try {
+ final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
final Properties overridingProps = new Properties();
- if(currentScnValue != null) {
+ if(txnScnValue==null && currentScnValue!=null) {
overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
}
+ overridingProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(selectStatement);
@@ -116,6 +120,11 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
// Optimize the query plan so that we potentially use secondary indexes
final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
+ if (txnScnValue!=null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
// Initialize the query plan so it sets up the parallel scans
queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
return queryPlan;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 8a4f963..f5117fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -191,9 +192,13 @@ public class IndexTool extends Configured implements Tool {
final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
// this is set to ensure index tables remains consistent post population.
- long indxTimestamp = pindexTable.getTimeStamp();
+ long maxTimeRange = pindexTable.getTimeStamp()+1;
+ if (pdataTable.isTransactional()) {
+ configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+ Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+ }
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
- Long.toString(indxTimestamp + 1));
+ Long.toString(maxTimeRange));
// check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
// computed from the qDataTable name.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 3bc3808..9c64efc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -80,8 +80,11 @@ public class PhoenixIndexImportDirectMapper extends
indxWritable.setColumnMetadata(indxTblColumnMetadata);
final Properties overrideProps = new Properties();
- overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
- configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+ String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ if(txScnValue==null) {
+ overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ }
connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
connection.setAutoCommit(false);
// Get BatchSize
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 517ce91..093b93d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -72,7 +72,11 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration);
indexTableName = PhoenixConfigurationUtil.getPhysicalTableName(configuration);
final Properties overrideProps = new Properties ();
- overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+ String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ if(txScnValue==null) {
+ overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+ }
connection = ConnectionUtil.getOutputConnection(configuration,overrideProps);
connection.setAutoCommit(false);
final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9e29fba..280daa2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.mapreduce.util;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
@@ -45,8 +47,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
/**
* A utility class to set properties on the {#link Configuration} instance.
* Used as part of Map Reduce job configuration.
@@ -87,6 +87,8 @@ public final class PhoenixConfigurationUtil {
public static final String CURRENT_SCN_VALUE = "phoenix.mr.currentscn.value";
+ public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value";
+
/** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e8d995c..ac2062a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1013,7 +1013,6 @@ public class MetaDataClient {
private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
AlterIndexStatement indexStatement = null;
boolean wasAutoCommit = connection.getAutoCommit();
- connection.rollback();
try {
connection.setAutoCommit(true);
MutationPlan mutationPlan;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 951bfce..a67a530 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.query;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -119,12 +118,6 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -179,6 +172,12 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -508,6 +507,7 @@ public abstract class BaseTest {
protected static String url;
protected static PhoenixTestDriver driver;
+ protected static PhoenixDriver realDriver;
protected static boolean clusterInitialized = false;
private static HBaseTestingUtility utility;
protected static final Configuration config = HBaseConfiguration.create();
@@ -588,9 +588,16 @@ public abstract class BaseTest {
assertTrue(destroyDriver(driver));
} finally {
driver = null;
- teardownTxManager();
}
}
+ if (realDriver != null) {
+ try {
+ assertTrue(destroyDriver(realDriver));
+ } finally {
+ realDriver = null;
+ }
+ }
+ teardownTxManager();
}
protected static void dropNonSystemTables() throws Exception {
@@ -607,7 +614,11 @@ public abstract class BaseTest {
} finally {
try {
if (utility != null) {
- utility.shutdownMiniCluster();
+ try {
+ utility.shutdownMiniMapReduceCluster();
+ } finally {
+ utility.shutdownMiniCluster();
+ }
}
} finally {
utility = null;
@@ -623,12 +634,33 @@ public abstract class BaseTest {
protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
String url = checkClusterInitialized(serverProps);
if (driver == null) {
- driver = initAndRegisterDriver(url, clientProps);
+ driver = initAndRegisterTestDriver(url, clientProps);
if (clientProps.getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
setupTxManager();
}
}
}
+
+ protected static void setUpRealDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
+ if (!clusterInitialized) {
+ setUpConfigForMiniCluster(config, serverProps);
+ utility = new HBaseTestingUtility(config);
+ try {
+ utility.startMiniCluster(NUM_SLAVES_BASE);
+ utility.startMiniMapReduceCluster();
+ url = QueryUtil.getConnectionUrl(new Properties(), utility.getConfiguration());
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ clusterInitialized = true;
+ }
+ Class.forName(PhoenixDriver.class.getName());
+ realDriver = PhoenixDriver.INSTANCE;
+ DriverManager.registerDriver(realDriver);
+ if (clientProps.getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+ setupTxManager();
+ }
+ }
private static boolean isDistributedClusterModeEnabled(Configuration conf) {
boolean isDistributedCluster = false;
@@ -739,7 +771,7 @@ public abstract class BaseTest {
* Create a {@link PhoenixTestDriver} and register it.
* @return an initialized and registered {@link PhoenixTestDriver}
*/
- public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+ public static PhoenixTestDriver initAndRegisterTestDriver(String url, ReadOnlyProps props) throws Exception {
PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
DriverManager.registerDriver(newDriver);
Driver oldDriver = DriverManager.getDriver(url);
@@ -1804,7 +1836,7 @@ public abstract class BaseTest {
assertEquals(expectedCount, count);
}
- public HBaseTestingUtility getUtility() {
+ public static HBaseTestingUtility getUtility() {
return utility;
}