You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/03/12 04:55:03 UTC
[1/2] phoenix git commit: fix for unescaping the columns
Repository: phoenix
Updated Branches:
refs/heads/master 80a50c964 -> 2bbbcfdbe
fix for unescaping the columns
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bbbcfdb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bbbcfdb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bbbcfdb
Branch: refs/heads/master
Commit: 2bbbcfdbe873e12e3d89a8187f9ad7837e59eddd
Parents: 55f64ea
Author: ravimagham <ra...@apache.org>
Authored: Wed Mar 11 12:48:35 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Wed Mar 11 20:41:36 2015 -0700
----------------------------------------------------------------------
.../main/java/org/apache/phoenix/util/ColumnInfo.java | 7 ++++---
.../main/java/org/apache/phoenix/util/SchemaUtil.java | 11 +++++++++++
.../phoenix/pig/util/PhoenixPigSchemaUtilTest.java | 4 ++--
3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index 46350be..3f94b92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -56,11 +56,12 @@ public class ColumnInfo {
* @return
*/
public String getDisplayName() {
- int index = columnName.indexOf(QueryConstants.NAME_SEPARATOR);
+ final String unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName);
+ int index = unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
if (index < 0) {
- return columnName;
+ return unescapedColumnName;
}
- return columnName.substring(index+1);
+ return unescapedColumnName.substring(index+1).trim();
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 1d986c6..2a1d3ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -681,4 +681,15 @@ public class SchemaUtil {
checkArgument(!isNullOrEmpty(columnName), "Column name cannot be null or empty");
return columnFamilyName == null ? ("\"" + columnName + "\"") : ("\"" + columnFamilyName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + columnName + "\"");
}
+
+ /**
+ * Replaces all occurrences of {@link #ESCAPE_CHARACTER} with an empty character.
+ * @param fullColumnName
+ * @return
+ */
+ public static String getUnEscapedFullColumnName(String fullColumnName) {
+ checkArgument(!isNullOrEmpty(fullColumnName), "Column name cannot be null or empty");
+ fullColumnName = fullColumnName.replaceAll(ESCAPE_CHARACTER, "");
+ return fullColumnName.trim();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index abfb442..24d27b1 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -66,10 +66,10 @@ public class PhoenixPigSchemaUtilTest {
// expected schema.
final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
- fields[0] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID"))
+ fields[0] = new ResourceFieldSchema().setName("ID")
.setType(DataType.LONG);
- fields[1] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME"))
+ fields[1] = new ResourceFieldSchema().setName("NAME")
.setType(DataType.CHARARRAY);
final ResourceSchema expected = new ResourceSchema().setFields(fields);
[2/2] phoenix git commit: PHOENIX-1609-4.0
Posted by ra...@apache.org.
PHOENIX-1609-4.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55f64eaf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55f64eaf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55f64eaf
Branch: refs/heads/master
Commit: 55f64eaf3d48bb788ec3f6d4e9d50b1d2baf5b9f
Parents: 80a50c9
Author: ravimagham <ra...@apache.org>
Authored: Thu Mar 5 16:18:57 2015 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Wed Mar 11 20:41:36 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/mapreduce/IndexToolIT.java | 296 ++++++++++++++++++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 7 +-
.../phoenix/compile/PostIndexDDLCompiler.java | 36 ++-
.../apache/phoenix/jdbc/PhoenixStatement.java | 8 +-
.../phoenix/mapreduce/CsvBulkImportUtil.java | 6 +-
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 42 +--
.../phoenix/mapreduce/PhoenixInputFormat.java | 9 +-
.../phoenix/mapreduce/PhoenixJobCounters.java | 29 ++
.../phoenix/mapreduce/index/IndexTool.java | 302 +++++++++++++++++++
.../mapreduce/index/PhoenixIndexDBWritable.java | 91 ++++++
.../index/PhoenixIndexImportMapper.java | 133 ++++++++
.../phoenix/mapreduce/util/ConnectionUtil.java | 14 +
.../util/PhoenixConfigurationUtil.java | 21 ++
.../phoenix/parse/CreateIndexStatement.java | 8 +-
.../apache/phoenix/parse/ParseNodeFactory.java | 4 +-
.../apache/phoenix/schema/MetaDataClient.java | 5 +
.../org/apache/phoenix/util/ColumnInfo.java | 13 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 46 ++-
.../org/apache/phoenix/util/SchemaUtil.java | 3 +
.../mapreduce/CsvBulkImportUtilTest.java | 14 +-
.../mapreduce/CsvToKeyValueMapperTest.java | 26 +-
.../org/apache/phoenix/util/ColumnInfoTest.java | 8 +-
.../org/apache/phoenix/util/QueryUtilTest.java | 2 +-
.../pig/util/PhoenixPigSchemaUtilTest.java | 5 +-
24 files changed, 1048 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
new file mode 100644
index 0000000..2b7b16b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for the {@link IndexTool}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolIT {
+
+ private static HBaseTestingUtility hbaseTestUtil;
+ private static String zkQuorum;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ hbaseTestUtil = new HBaseTestingUtility();
+ Configuration conf = hbaseTestUtil.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ hbaseTestUtil.startMiniCluster();
+ hbaseTestUtil.startMiniMapReduceCluster();
+ Class.forName(PhoenixDriver.class.getName());
+ zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ }
+
+ @Test
+ public void testImmutableGlobalIndex() throws Exception {
+ testSecondaryIndex("DATA_TABLE1",true, false);
+ }
+
+ @Test
+ public void testImmutableLocalIndex() throws Exception {
+ testSecondaryIndex("DATA_TABLE2",true, true);
+ }
+
+ @Test
+ public void testMutableGlobalIndex() throws Exception {
+ testSecondaryIndex("DATA_TABLE3",false, false);
+ }
+
+ @Test
+ public void testMutableLocalIndex() throws Exception {
+ testSecondaryIndex("DATA_TABLE4",false, true);
+ }
+
+ public void testSecondaryIndex(final String dataTable , final boolean isImmutable , final boolean isLocal) throws Exception {
+
+ final String indxTable = String.format("%s_%s",dataTable,"INDX");
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+ Statement stmt = conn.createStatement();
+ try {
+
+ stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s",dataTable, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ int id = 1;
+ // insert two rows
+ upsertRow(stmt1, id++);
+ upsertRow(stmt1, id++);
+ conn.commit();
+
+ stmt.execute(String.format("CREATE %s INDEX %s ON %s (UPPER(NAME)) ASYNC ", (isLocal ? "LOCAL" : ""), indxTable,dataTable));
+
+ //verify rows are fetched from data table.
+ String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+ //assert we are pulling from data table.
+ assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+
+ rs = stmt1.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME1", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+
+ //run the index MR job.
+ final IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+
+ final String[] cmdArgs = getArgValues(dataTable,indxTable);
+ int status = indexingTool.run(cmdArgs);
+ assertEquals(0, status);
+
+ // insert two more rows
+ upsertRow(stmt1, 3);
+ upsertRow(stmt1, 4);
+ conn.commit();
+
+ //assert we are pulling from index table.
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(actualExplainPlan,dataTable,indxTable,isLocal);
+
+ rs = stmt.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME1", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
+
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+ assertEquals(2, rs.getInt(2));
+
+ assertTrue(rs.next());
+ assertEquals("UNAME3", rs.getString(1));
+ assertEquals(3, rs.getInt(2));
+
+ assertTrue(rs.next());
+ assertEquals("UNAME4", rs.getString(1));
+ assertEquals(4, rs.getInt(2));
+
+ assertFalse(rs.next());
+
+ conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable));
+ } finally {
+ conn.close();
+ }
+ }
+
+
+ /**
+ * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
+ * the MR job runs, do show up in the index table .
+ * @throws Exception
+ */
+ @Test
+ public void testMutalbleIndexWithUpdates() throws Exception {
+
+ final String dataTable = "DATA_TABLE5";
+ final String indxTable = String.format("%s_%s",dataTable,"INDX");
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+ Statement stmt = conn.createStatement();
+ try {
+
+ stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ int id = 1;
+ // insert two rows
+ upsertRow(stmt1, id++);
+ upsertRow(stmt1, id++);
+ conn.commit();
+
+ stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
+
+ //update a row
+ stmt1.setInt(1, 1);
+ stmt1.setString(2, "uname" + String.valueOf(10));
+ stmt1.setInt(3, 95050 + 1);
+ stmt1.executeUpdate();
+ conn.commit();
+
+ //verify rows are fetched from data table.
+ String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+
+ //assert we are pulling from data table.
+ assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+
+ rs = stmt1.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME10", rs.getString(1));
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+
+ //run the index MR job.
+ final IndexTool indexingTool = new IndexTool();
+ indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+
+ final String[] cmdArgs = getArgValues(dataTable,indxTable);
+ int status = indexingTool.run(cmdArgs);
+ assertEquals(0, status);
+
+ //assert we are pulling from index table.
+ rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+ actualExplainPlan = QueryUtil.getExplainPlan(rs);
+ assertExplainPlan(actualExplainPlan,dataTable,indxTable,false);
+
+ rs = stmt.executeQuery(selectSql);
+ assertTrue(rs.next());
+ assertEquals("UNAME10", rs.getString(1));
+ assertEquals(1, rs.getInt(2));
+
+ assertTrue(rs.next());
+ assertEquals("UNAME2", rs.getString(1));
+ assertEquals(2, rs.getInt(2));
+ conn.createStatement().execute(String.format("DROP INDEX %s ON %s",indxTable , dataTable));
+ } finally {
+ conn.close();
+ }
+ }
+
+ private void assertExplainPlan(final String actualExplainPlan, String dataTable,
+ String indxTable, boolean isLocal) {
+
+ String expectedExplainPlan = "";
+ if(isLocal) {
+ final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable);
+ expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
+ + "\n SERVER FILTER BY FIRST KEY ONLY"
+ + "\nCLIENT MERGE SORT", localIndexName);
+ } else {
+ expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
+ + "\n SERVER FILTER BY FIRST KEY ONLY",indxTable);
+ }
+ assertEquals(expectedExplainPlan,actualExplainPlan);
+ }
+
+ private String[] getArgValues(String dataTable, String indxTable) {
+ final List<String> args = Lists.newArrayList();
+ args.add("-dt");
+ args.add(dataTable);
+ args.add("-it");
+ args.add(indxTable);
+ args.add("-op");
+ args.add("/tmp/"+UUID.randomUUID().toString());
+ return args.toArray(new String[0]);
+ }
+
+ private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+ // insert row
+ stmt.setInt(1, i);
+ stmt.setString(2, "uname" + String.valueOf(i));
+ stmt.setInt(3, 95050 + i);
+ stmt.executeUpdate();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ try {
+ PhoenixDriver.INSTANCE.close();
+ } finally {
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ try {
+ hbaseTestUtil.shutdownMiniMapReduceCluster();
+ } finally {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 9e843a0..9750ee7 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -84,6 +84,7 @@ tokens
WITHIN='within';
SET='set';
CAST='cast';
+ ACTIVE='active';
USABLE='usable';
UNUSABLE='unusable';
DISABLE='disable';
@@ -109,6 +110,7 @@ tokens
STATISTICS='statistics';
COLUMNS='columns';
TRACE='trace';
+ ASYNC='async';
}
@@ -405,9 +407,10 @@ create_index_node returns [CreateIndexStatement ret]
: CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
(LPAREN ik=ik_constraint RPAREN)
(INCLUDE (LPAREN icrefs=column_names RPAREN))?
+ (async=ASYNC)?
(p=fam_properties)?
(SPLIT ON v=value_expression_list)?
- {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, getBindCount()); }
+ {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); }
;
// Parse a create sequence statement.
@@ -498,7 +501,7 @@ drop_index_node returns [DropIndexStatement ret]
// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
- : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE)
+ : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)
{ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index c8cf28e..5836b99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -28,6 +28,8 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.IndexUtil;
+import com.google.common.collect.Lists;
+
/**
* Class that compiles plan to generate initial data values after a DDL command for
@@ -36,10 +38,15 @@ import org.apache.phoenix.util.IndexUtil;
public class PostIndexDDLCompiler {
private final PhoenixConnection connection;
private final TableRef dataTableRef;
-
+ private List<String> indexColumnNames;
+ private List<String> dataColumnNames;
+ private String selectQuery;
+
public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) {
this.connection = connection;
this.dataTableRef = dataTableRef;
+ indexColumnNames = Lists.newArrayList();
+ dataColumnNames = Lists.newArrayList();
}
public MutationPlan compile(final PTable indexTable) throws SQLException {
@@ -66,8 +73,11 @@ public class PostIndexDDLCompiler {
for (int i = posOffset; i < nIndexPKColumns; i++) {
PColumn col = indexPKColumns.get(i);
String indexColName = col.getName().getString();
- dataColumns.append(col.getExpressionStr()).append(",");
+ String dataColName = col.getExpressionStr();
+ dataColumns.append(dataColName).append(",");
indexColumns.append('"').append(indexColName).append("\",");
+ indexColumnNames.add(indexColName);
+ dataColumnNames.add(dataColName);
}
// Add the covered columns
@@ -82,6 +92,8 @@ public class PostIndexDDLCompiler {
}
dataColumns.append('"').append(dataColumnName).append("\",");
indexColumns.append('"').append(indexColName).append("\",");
+ indexColumnNames.add(indexColName);
+ dataColumnNames.add(dataColumnName);
}
}
}
@@ -93,11 +105,27 @@ public class PostIndexDDLCompiler {
StringBuilder updateStmtStr = new StringBuilder();
updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(")
- .append(indexColumns).append(") SELECT ").append(dataColumns).append(" FROM ")
- .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+ .append(indexColumns).append(") ");
+ final StringBuilder selectQueryBuilder = new StringBuilder();
+ selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" FROM ")
+ .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+ this.selectQuery = selectQueryBuilder.toString();
+ updateStmtStr.append(this.selectQuery);
final PhoenixStatement statement = new PhoenixStatement(connection);
return statement.compileMutation(updateStmtStr.toString());
}
+ public List<String> getIndexColumnNames() {
+ return indexColumnNames;
+ }
+
+ public List<String> getDataColumnNames() {
+ return dataColumnNames;
+ }
+
+ public String getSelectQuery() {
+ return selectQuery;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 746c0b7..4e61391 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -535,8 +535,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement {
public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
- ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
- super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+ ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
+ super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async , bindCount);
}
@SuppressWarnings("unchecked")
@@ -879,8 +879,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
@Override
public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
- ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
- return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+ ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
+ return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index e62cbb8..8f0f7d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.mapreduce;
import java.util.List;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.util.ColumnInfo;
+import com.google.common.base.Preconditions;
+
/**
* Collection of utility methods for setting up bulk import jobs.
*/
@@ -65,7 +67,7 @@ public class CsvBulkImportUtil {
*/
public static void configurePreUpsertProcessor(Configuration conf,
Class<? extends ImportPreUpsertKeyValueProcessor> processorClass) {
- conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, processorClass,
+ conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, processorClass,
ImportPreUpsertKeyValueProcessor.class);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 6ff7ba3..90cb854 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -28,15 +28,6 @@ import java.util.Properties;
import javax.annotation.Nullable;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
@@ -49,9 +40,9 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -59,6 +50,15 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
/**
* MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles.
* <p/>
@@ -73,9 +73,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
- /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
- public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
-
/** Configuration key for the field delimiter for input csv records */
public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";
@@ -136,7 +133,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
- preUpdateProcessor = loadPreUpsertProcessor(conf);
+ preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){
tableName = Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY));
} else {
@@ -193,23 +190,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
}
/**
- * Load the configured ImportPreUpsertKeyValueProcessor, or supply a dummy processor.
- */
- @VisibleForTesting
- static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) {
- Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
- try {
- processorClass = conf.getClass(
- UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class,
- ImportPreUpsertKeyValueProcessor.class);
- } catch (Exception e) {
- throw new IllegalStateException("Couldn't load upsert hook class", e);
- }
-
- return ReflectionUtils.newInstance(processorClass, conf);
- }
-
- /**
* Build up the JDBC URL for connecting to Phoenix.
*
* @return the full JDBC URL for a Phoenix connection
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 7c67c2c..a83b9ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.PhoenixRuntime;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -99,7 +101,12 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
Preconditions.checkNotNull(context);
try{
- final Connection connection = ConnectionUtil.getConnection(configuration);
+ final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final Properties overridingProps = new Properties();
+ if(currentScnValue != null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
+ }
+ final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps);
final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
Preconditions.checkNotNull(selectStatement);
final Statement statement = connection.createStatement();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
new file mode 100644
index 0000000..4a869d9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+/**
+ * Counters used during Map Reduce jobs
+ *
+ */
+public enum PhoenixJobCounters {
+
+ INPUT_RECORDS,
+ FAILED_RECORDS,
+ OUTPUT_RECORDS;
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
new file mode 100644
index 0000000..d93ef9c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An MR job to populate the index table from the data table.
+ *
+ */
+public class IndexTool extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
+
+ private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)");
+ private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, "Data table name (mandatory)");
+ private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true, "Index table name(mandatory)");
+ private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written(mandatory)");
+ private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+
+ private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF EXISTS %s ON %s %s";
+ private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+
+
+ private Options getOptions() {
+ final Options options = new Options();
+ options.addOption(SCHEMA_NAME_OPTION);
+ options.addOption(DATA_TABLE_OPTION);
+ options.addOption(INDEX_TABLE_OPTION);
+ options.addOption(OUTPUT_PATH_OPTION);
+ options.addOption(HELP_OPTION);
+ return options;
+ }
+
+ /**
+ * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+ * missing.
+ *
+ * @param args supplied command line arguments
+ * @return the parsed command line
+ */
+ private CommandLine parseOptions(String[] args) {
+
+ final Options options = getOptions();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: "+ e.getMessage(), options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
+ throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory "
+ + "parameter");
+ }
+
+ if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+ throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory "
+ + "parameter");
+ }
+
+ if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
+ throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory "
+ + "parameter");
+ }
+ return cmdLine;
+ }
+
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Connection connection = null;
+ try {
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ }
+ final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+ final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+ final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+ final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+ final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable);
+ final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
+
+ connection = ConnectionUtil.getConnection(configuration);
+ if(!isValidIndexTable(connection, dataTable, indexTable)) {
+ throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable));
+ }
+
+ final PTable pdataTable = PhoenixRuntime.getTable(connection, dataTable);
+ final PTable pindexTable = PhoenixRuntime.getTable(connection, indexTable);
+
+ // this is set to ensure index tables remains consistent post population.
+ long indxTimestamp = pindexTable.getTimeStamp();
+ configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,Long.toString(indxTimestamp + 1));
+
+ // check if the index type is LOCAL, if so, set the logicalIndexName that is computed from the dataTable name.
+ String logicalIndexTable = qIndexTable;
+ if(IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+ logicalIndexTable = MetaDataUtil.getLocalIndexTableName(dataTable);
+ }
+
+ final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
+ final PostIndexDDLCompiler ddlCompiler = new PostIndexDDLCompiler(pConnection,new TableRef(pdataTable));
+ ddlCompiler.compile(pindexTable);
+
+ final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
+ final String selectQuery = ddlCompiler.getSelectQuery();
+ final String upsertQuery = QueryUtil.constructUpsertStatement(indexTable, indexColumns, Hint.NO_INDEX);
+
+ configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
+ PhoenixConfigurationUtil.setOutputTableName(configuration, logicalIndexTable);
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,indexColumns.toArray(new String[indexColumns.size()]));
+ final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ configuration.set(PhoenixConfigurationUtil.UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+
+ final Path outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()),logicalIndexTable);
+
+ final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE,dataTable,indexTable);
+ final Job job = Job.getInstance(configuration, jobName);
+ job.setJarByClass(IndexTool.class);
+
+ job.setMapperClass(PhoenixIndexImportMapper.class);
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,dataTable,selectQuery);
+
+ TableMapReduceUtil.initCredentials(job);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ final HTable htable = new HTable(configuration, logicalIndexTable);
+ HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+ boolean status = job.waitForCompletion(true);
+ if (!status) {
+ LOG.error("Failed to run the IndexTool job. ");
+ htable.close();
+ return -1;
+ }
+
+ LOG.info("Loading HFiles from {}", outputPath);
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
+ loader.doBulkLoad(outputPath, htable);
+ htable.close();
+
+ LOG.info("Removing output directory {}", outputPath);
+ if (!FileSystem.get(configuration).delete(outputPath, true)) {
+ LOG.error("Removing output directory {} failed", outputPath);
+ }
+
+ // finally update the index state to ACTIVE.
+ updateIndexState(connection,dataTable,indexTable,PIndexState.ACTIVE);
+ return 0;
+
+ } catch (Exception ex) {
+ LOG.error(" An exception occured while performing the indexing job , error message {} ",ex.getMessage());
+ return -1;
+ } finally {
+ try {
+ if(connection != null) {
+ connection.close();
+ }
+ } catch(SQLException sqle) {
+ LOG.error(" Failed to close connection ",sqle.getMessage());
+ throw new RuntimeException("Failed to close connection");
+ }
+ }
+ }
+
+ /**
+ * Checks for the validity of the index table passed to the job.
+ * @param connection
+ * @param masterTable
+ * @param indexTable
+ * @return
+ * @throws SQLException
+ */
+ private boolean isValidIndexTable(final Connection connection, final String masterTable, final String indexTable) throws SQLException {
+ final DatabaseMetaData dbMetaData = connection.getMetaData();
+ final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
+ final String tableName = SchemaUtil.getTableNameFromFullName(masterTable);
+
+ ResultSet rs = null;
+ try {
+ rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false);
+ while(rs.next()) {
+ final String indexName = rs.getString(6);
+ if(indexTable.equalsIgnoreCase(indexName)) {
+ return true;
+ }
+ }
+ } finally {
+ if(rs != null) {
+ rs.close();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Updates the index state.
+ * @param connection
+ * @param masterTable
+ * @param indexTable
+ * @param state
+ * @throws SQLException
+ */
+ private void updateIndexState(Connection connection, final String masterTable , final String indexTable, PIndexState state) throws SQLException {
+ Preconditions.checkNotNull(connection);
+ final String alterQuery = String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name());
+ connection.createStatement().execute(alterQuery);
+ LOG.info(" Updated the status of the index {} to {} " , indexTable , state.name());
+ }
+
+ public static void main(final String[] args) throws Exception {
+ int result = ToolRunner.run(new IndexTool(), args);
+ System.exit(result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
new file mode 100644
index 0000000..2be810a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A {@link DBWritable} class that reads and write records.
+ *
+ *
+ */
+public class PhoenixIndexDBWritable implements DBWritable {
+
+ private List<ColumnInfo> columnMetadata;
+
+ private List<Object> values;
+
+ private int columnCount = -1;
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ Preconditions.checkNotNull(values);
+ Preconditions.checkNotNull(columnMetadata);
+ for(int i = 0 ; i < values.size() ; i++) {
+ Object value = values.get(i);
+ ColumnInfo columnInfo = columnMetadata.get(i);
+ if(value == null) {
+ statement.setNull(i + 1, columnInfo.getSqlType());
+ } else {
+ statement.setObject(i + 1, value , columnInfo.getSqlType());
+ }
+ }
+
+ }
+
+ @Override
+ public void readFields(ResultSet resultSet) throws SQLException {
+ // we do this once per mapper.
+ if(columnCount == -1) {
+ this.columnCount = resultSet.getMetaData().getColumnCount();
+ }
+
+ values = Lists.newArrayListWithCapacity(columnCount);
+ for(int i = 0 ; i < columnCount ; i++) {
+ Object value = resultSet.getObject(i + 1);
+ values.add(value);
+ }
+
+ }
+
+ public List<ColumnInfo> getColumnMetadata() {
+ return columnMetadata;
+ }
+
+ public void setColumnMetadata(List<ColumnInfo> columnMetadata) {
+ this.columnMetadata = columnMetadata;
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ public void setValues(List<Object> values) {
+ this.values = values;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
new file mode 100644
index 0000000..7bf4bfc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ *
+ */
+public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, KeyValue> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexImportMapper.class);
+
+ private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
+
+ private List<ColumnInfo> indxTblColumnMetadata ;
+
+ private Connection connection;
+
+ private String indexTableName;
+
+ private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+
+ private PreparedStatement pStatement;
+
+ @Override
+ protected void setup(final Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ final Configuration configuration = context.getConfiguration();
+ try {
+ indxTblColumnMetadata = PhoenixConfigurationUtil.getUpsertColumnMetadataList(context.getConfiguration());
+ indxWritable.setColumnMetadata(indxTblColumnMetadata);
+
+ preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration);
+ indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration);
+ final Properties overrideProps = new Properties ();
+ overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+ connection = ConnectionUtil.getConnection(configuration,overrideProps);
+ connection.setAutoCommit(false);
+ final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.pStatement = connection.prepareStatement(upsertQuery);
+
+ } catch (SQLException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+
+ @Override
+ protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+ throws IOException, InterruptedException {
+
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+
+ try {
+ final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ final List<Object> values = record.getValues();
+ indxWritable.setValues(values);
+ indxWritable.write(this.pStatement);
+ this.pStatement.execute();
+
+ final Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true);
+ while (uncommittedDataIterator.hasNext()) {
+ Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+ if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) {
+ // skip edits for other tables
+ continue;
+ }
+ List<KeyValue> keyValueList = kvPair.getSecond();
+ keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+ for (KeyValue kv : keyValueList) {
+ outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+ context.write(outputKey, kv);
+ }
+ context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+ }
+ connection.rollback();
+ } catch (SQLException e) {
+ LOG.error(" Error {} while read/write of a record ",e.getMessage());
+ context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ super.cleanup(context);
+ if(connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",e.getMessage());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index 364baf7..3234967 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -42,6 +42,17 @@ public class ConnectionUtil {
* @throws SQLException
*/
public static Connection getConnection(final Configuration configuration) throws SQLException {
+ return getConnection(configuration, null);
+ }
+
+ /**
+ * Used primarily in cases where we need to pass few additional/overriding properties
+ * @param configuration
+ * @param properties
+ * @return
+ * @throws SQLException
+ */
+ public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException {
Preconditions.checkNotNull(configuration);
final Properties props = new Properties();
Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
@@ -51,6 +62,9 @@ public class ConnectionUtil {
props.setProperty(entry.getKey(), entry.getValue());
}
}
+ if(properties != null && !properties.isEmpty()) {
+ props.putAll(properties);
+ }
final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
return conn;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 4d025ee..b8b64b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -33,7 +33,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -83,6 +86,11 @@ public final class PhoenixConfigurationUtil {
public static final String INPUT_CLASS = "phoenix.input.class";
+ public static final String CURRENT_SCN_VALUE = "phoenix.mr.currentscn.value";
+
+ /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
+ public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
+
public enum SchemaType {
TABLE,
QUERY;
@@ -313,4 +321,17 @@ public final class PhoenixConfigurationUtil {
//In order to have phoenix working on a secured cluster
TableMapReduceUtil.initCredentials(job);
}
+
+ public static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) {
+ Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
+ try {
+ processorClass = conf.getClass(
+ UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class,
+ ImportPreUpsertKeyValueProcessor.class);
+ } catch (Exception e) {
+ throw new IllegalStateException("Couldn't load upsert hook class", e);
+ }
+
+ return ReflectionUtils.newInstance(processorClass, conf);
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
index bf76174..5f52f59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
@@ -35,10 +35,11 @@ public class CreateIndexStatement extends SingleTableStatement {
private final ListMultimap<String,Pair<String,Object>> props;
private final boolean ifNotExists;
private final IndexType indexType;
+ private final boolean async;
public CreateIndexStatement(NamedNode indexTableName, NamedTableNode dataTable,
IndexKeyConstraint indexKeyConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
- ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
+ ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
super(dataTable, bindCount);
this.indexTableName =TableName.create(dataTable.getName().getSchemaName(),indexTableName.getName());
this.indexKeyConstraint = indexKeyConstraint == null ? IndexKeyConstraint.EMPTY : indexKeyConstraint;
@@ -47,6 +48,7 @@ public class CreateIndexStatement extends SingleTableStatement {
this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
this.ifNotExists = ifNotExists;
this.indexType = indexType;
+ this.async = async;
}
public IndexKeyConstraint getIndexConstraint() {
@@ -78,4 +80,8 @@ public class CreateIndexStatement extends SingleTableStatement {
return indexType;
}
+ public boolean isAsync() {
+ return async;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4e8f792..2a4168d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -282,8 +282,8 @@ public class ParseNodeFactory {
return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
}
- public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
- return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+ public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) {
+ return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
}
public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e133433..ab3c284 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1128,6 +1128,11 @@ public class MetaDataClient {
return new MutationState(0,connection);
}
+ // In async process, we return immediately as the MR job needs to be triggered .
+ if(statement.isAsync()) {
+ return new MutationState(0, connection);
+ }
+
// If our connection is at a fixed point-in-time, we need to open a new
// connection so that our new index table is visible.
if (connection.getSCN() != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index 55865c0..46350be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -16,7 +16,6 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PDataType;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
/**
@@ -33,6 +32,9 @@ public class ColumnInfo {
public ColumnInfo(String columnName, int sqlType) {
Preconditions.checkNotNull(columnName, "columnName cannot be null");
Preconditions.checkArgument(!columnName.isEmpty(), "columnName cannot be empty");
+ if(!columnName.startsWith(SchemaUtil.ESCAPE_CHARACTER)) {
+ columnName = SchemaUtil.getEscapedFullColumnName(columnName);
+ }
this.columnName = columnName;
this.sqlType = sqlType;
}
@@ -63,7 +65,7 @@ public class ColumnInfo {
@Override
public String toString() {
- return columnName + STR_SEPARATOR + getPDataType().getSqlTypeName();
+ return getPDataType().getSqlTypeName() + STR_SEPARATOR + columnName ;
}
@Override
@@ -97,14 +99,15 @@ public class ColumnInfo {
*/
public static ColumnInfo fromString(String stringRepresentation) {
List<String> components =
- Lists.newArrayList(Splitter.on(STR_SEPARATOR).split(stringRepresentation));
+ Lists.newArrayList(stringRepresentation.split(":",2));
+
if (components.size() != 2) {
throw new IllegalArgumentException("Unparseable string: " + stringRepresentation);
}
return new ColumnInfo(
- components.get(0),
- PDataType.fromSqlTypeName(components.get(1)).getSqlType());
+ components.get(1),
+ PDataType.fromSqlTypeName(components.get(0)).getSqlType());
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index e0b4c2e..993016a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.WildcardParseNode;
import org.apache.phoenix.query.QueryServices;
@@ -109,21 +111,53 @@ public final class QueryUtil {
throw new IllegalArgumentException("At least one column must be provided for upserts");
}
+ final List<String> columnNames = Lists.transform(columnInfos, new Function<ColumnInfo,String>() {
+ @Override
+ public String apply(ColumnInfo columnInfo) {
+ return columnInfo.getColumnName();
+ }
+ });
+ return constructUpsertStatement(tableName, columnNames, null);
+
+ }
+
+ /**
+ * Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of
+ * {@code ColumnInfo}s must contain at least one element.
+ *
+ * @param tableName name of the table for which the upsert statement is to be created
+ * @param columns list of columns to be included in the upsert statement
+ * @param Hint hint to be added to the UPSERT statement.
+ * @return the created {@code UPSERT} statement
+ */
+ public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) {
+
+ if (columns.isEmpty()) {
+ throw new IllegalArgumentException("At least one column must be provided for upserts");
+ }
+
+ String hintStr = "";
+ if(hint != null) {
+ final HintNode node = new HintNode(hint.name());
+ hintStr = node.toString();
+ }
+
List<String> parameterList = Lists.newArrayList();
- for (int i = 0; i < columnInfos.size(); i++) {
+ for (int i = 0; i < columns.size(); i++) {
parameterList.add("?");
}
return String.format(
- "UPSERT INTO %s (%s) VALUES (%s)",
+ "UPSERT %s INTO %s (%s) VALUES (%s)",
+ hintStr,
tableName,
Joiner.on(", ").join(
Iterables.transform(
- columnInfos,
- new Function<ColumnInfo, String>() {
+ columns,
+ new Function<String, String>() {
@Nullable
@Override
- public String apply(@Nullable ColumnInfo columnInfo) {
- return getEscapedFullColumnName(columnInfo.getColumnName());
+ public String apply(@Nullable String columnName) {
+ return getEscapedFullColumnName(columnName);
}
})),
Joiner.on(", ").join(parameterList));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c9574e3..1d986c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -632,6 +632,9 @@ public class SchemaUtil {
}
public static String getEscapedFullColumnName(String fullColumnName) {
+ if(fullColumnName.startsWith(ESCAPE_CHARACTER)) {
+ return fullColumnName;
+ }
int index = fullColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
if (index < 0) {
return getEscapedArgument(fullColumnName);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 4cb5732..a00e228 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,17 +17,19 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.collect.ImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ColumnInfo;
import org.junit.Test;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import com.google.common.collect.ImmutableList;
public class CsvBulkImportUtilTest {
@@ -57,7 +59,7 @@ public class CsvBulkImportUtilTest {
public void testConfigurePreUpsertProcessor() {
Configuration conf = new Configuration();
CsvBulkImportUtil.configurePreUpsertProcessor(conf, MockProcessor.class);
- ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+ ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
assertEquals(MockProcessor.class, processor.getClass());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index ee9d0e1..4033a65 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -17,23 +17,25 @@
*/
package org.apache.phoenix.mapreduce;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PIntegerArray;
import org.apache.phoenix.schema.types.PUnsignedInt;
import org.apache.phoenix.util.ColumnInfo;
import org.junit.Test;
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
public class CsvToKeyValueMapperTest {
@@ -110,10 +112,10 @@ public class CsvToKeyValueMapperTest {
@Test
public void testLoadPreUpdateProcessor() {
Configuration conf = new Configuration();
- conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
+ conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
ImportPreUpsertKeyValueProcessor.class);
- ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+ ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
assertEquals(MockUpsertProcessor.class, processor.getClass());
}
@@ -121,7 +123,7 @@ public class CsvToKeyValueMapperTest {
public void testLoadPreUpdateProcessor_NotConfigured() {
Configuration conf = new Configuration();
- ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+ ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
processor.getClass());
@@ -130,9 +132,9 @@ public class CsvToKeyValueMapperTest {
@Test(expected=IllegalStateException.class)
public void testLoadPreUpdateProcessor_ClassNotFound() {
Configuration conf = new Configuration();
- conf.set(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
+ conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
- CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+ PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
index 931d6fd..7f460cd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
@@ -30,7 +30,7 @@ public class ColumnInfoTest {
@Test
public void testToFromStringRoundTrip() {
- ColumnInfo columnInfo = new ColumnInfo("myColumn", Types.INTEGER);
+ ColumnInfo columnInfo = new ColumnInfo("a.myColumn", Types.INTEGER);
assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
}
@@ -49,4 +49,10 @@ public class ColumnInfoTest {
assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), sqlE.getErrorCode());
}
}
+
+ @Test
+ public void testToFromColonInColumnName() {
+ ColumnInfo columnInfo = new ColumnInfo(":myColumn", Types.INTEGER);
+ assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 33e3b5a..beabaf1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -38,7 +38,7 @@ public class QueryUtilTest {
@Test
public void testConstructUpsertStatement_ColumnInfos() {
assertEquals(
- "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
+ "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.of(ID_COLUMN, NAME_COLUMN)));
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 7a861b9..abfb442 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
@@ -65,10 +66,10 @@ public class PhoenixPigSchemaUtilTest {
// expected schema.
final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
- fields[0] = new ResourceFieldSchema().setName("ID")
+ fields[0] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID"))
.setType(DataType.LONG);
- fields[1] = new ResourceFieldSchema().setName("NAME")
+ fields[1] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME"))
.setType(DataType.CHARARRAY);
final ResourceSchema expected = new ResourceSchema().setFields(fields);