You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2017/03/09 23:02:46 UTC
[33/50] [abbrv] phoenix git commit: PHOENIX-3346 Hive
PhoenixStorageHandler doesn't work well with column
PHOENIX-3346 Hive PhoenixStorageHandler doesn't work well with column
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7201dd5e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7201dd5e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7201dd5e
Branch: refs/heads/omid
Commit: 7201dd5e17096209d26ca3620054fc72665cf4fe
Parents: 5f5662b
Author: Sergey Soldatov <ss...@apache.org>
Authored: Wed Mar 1 11:51:46 2017 -0800
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Wed Mar 1 11:55:06 2017 -0800
----------------------------------------------------------------------
phoenix-hive/pom.xml | 13 +
.../phoenix/hive/BaseHivePhoenixStoreIT.java | 165 ++++++++++
.../apache/phoenix/hive/HiveMapReduceIT.java | 32 ++
.../apache/phoenix/hive/HivePhoenixStoreIT.java | 330 ++++++++++---------
.../org/apache/phoenix/hive/HiveTestUtil.java | 22 +-
.../java/org/apache/phoenix/hive/HiveTezIT.java | 32 ++
.../apache/phoenix/hive/PhoenixMetaHook.java | 37 +--
.../org/apache/phoenix/hive/PhoenixSerDe.java | 9 +-
.../apache/phoenix/hive/PhoenixSerializer.java | 4 +
.../phoenix/hive/PhoenixStorageHandler.java | 5 +
.../hive/mapreduce/PhoenixInputFormat.java | 3 +-
.../hive/mapreduce/PhoenixRecordReader.java | 1 +
.../hive/mapreduce/PhoenixResultWritable.java | 12 +-
.../phoenix/hive/query/PhoenixQueryBuilder.java | 76 ++++-
.../phoenix/hive/util/ColumnMappingUtils.java | 76 +++++
.../hive/util/PhoenixConnectionUtil.java | 19 ++
.../hive/query/PhoenixQueryBuilderTest.java | 10 +-
17 files changed, 604 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-hive/pom.xml b/phoenix-hive/pom.xml
index e6d3f86..c6f5d40 100644
--- a/phoenix-hive/pom.xml
+++ b/phoenix-hive/pom.xml
@@ -110,6 +110,19 @@
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-tests</artifactId>
+ <scope>test</scope>
+ <version>0.8.4</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <scope>test</scope>
+ <version>0.8.4</version>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java
new file mode 100644
index 0000000..ac0a7fc
--- /dev/null
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/BaseHivePhoenixStoreIT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.AfterClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.*;
+import java.util.Properties;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for all Hive Phoenix integration tests that may be run with Tez or MR mini cluster
+ */
+public class BaseHivePhoenixStoreIT {
+
+ private static final Log LOG = LogFactory.getLog(BaseHivePhoenixStoreIT.class);
+ protected static HBaseTestingUtility hbaseTestUtil;
+ protected static MiniHBaseCluster hbaseCluster;
+ private static String zkQuorum;
+ protected static Connection conn;
+ private static Configuration conf;
+ protected static HiveTestUtil qt;
+ protected static String hiveOutputDir;
+ protected static String hiveLogDir;
+
+
+ public static void setup(HiveTestUtil.MiniClusterType clusterType)throws Exception {
+ String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+ if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) {
+ LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which may cause "
+ + "issues with test execution via MiniDFSCluster");
+ }
+ hbaseTestUtil = new HBaseTestingUtility();
+ conf = hbaseTestUtil.getConfiguration();
+ setUpConfigForMiniCluster(conf);
+ conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_output").toString();
+ File outputDir = new File(hiveOutputDir);
+ outputDir.mkdirs();
+ hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_log").toString();
+ File logDir = new File(hiveLogDir);
+ logDir.mkdirs();
+ // Setup Hive mini Server
+ Path testRoot = hbaseTestUtil.getDataTestDir();
+ System.setProperty("test.tmp.dir", testRoot.toString());
+ System.setProperty("test.warehouse.dir", (new Path(testRoot, "warehouse")).toString());
+
+ try {
+ qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, clusterType, null);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in setup", e);
+ fail("Unexpected exception in setup");
+ }
+
+ //Start HBase cluster
+ hbaseCluster = hbaseTestUtil.startMiniCluster(3);
+ MiniDFSCluster x = hbaseTestUtil.getDFSCluster();
+ Class.forName(PhoenixDriver.class.getName());
+ zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+ conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL +
+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props);
+ // Setup Hive Output Folder
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("create table t(a integer primary key,b varchar)");
+ }
+
+ protected void runTest(String fname, String fpath) throws Exception {
+ long startTime = System.currentTimeMillis();
+ try {
+ LOG.info("Begin query: " + fname);
+ qt.addFile(fpath);
+
+ if (qt.shouldBeSkipped(fname)) {
+ LOG.info("Test " + fname + " skipped");
+ return;
+ }
+
+ qt.cliInit(fname);
+ qt.clearTestSideEffects();
+ int ecode = qt.executeClient(fname);
+ if (ecode != 0) {
+ qt.failed(ecode, fname, null);
+ return;
+ }
+
+ ecode = qt.checkCliDriverResults(fname);
+ if (ecode != 0) {
+ qt.failedDiff(ecode, fname, null);
+ }
+ qt.clearPostTestEffects();
+
+ } catch (Throwable e) {
+ qt.failed(e, fname, null);
+ }
+
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s");
+ assertTrue("Test passed", true);
+ }
+
+ protected void createFile(String content, String fullName) throws IOException {
+ FileUtils.write(new File(fullName), content);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (qt != null) {
+ try {
+ qt.shutdown();
+ } catch (Exception e) {
+ LOG.error("Unexpected exception in setup", e);
+ fail("Unexpected exception in tearDown");
+ }
+ }
+ try {
+ conn.close();
+ } finally {
+ try {
+ PhoenixDriver.INSTANCE.close();
+ } finally {
+ try {
+ DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+ } finally {
+ hbaseTestUtil.shutdownMiniCluster();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java
new file mode 100644
index 0000000..7203597
--- /dev/null
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveMapReduceIT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hive;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HiveMapReduceIT extends HivePhoenixStoreIT {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setup(HiveTestUtil.MiniClusterType.mr);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
index a707a06..cf12a80 100644
--- a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HivePhoenixStoreIT.java
@@ -17,99 +17,22 @@
*/
package org.apache.phoenix.hive;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.StringUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import java.io.File;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Properties;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
- * Test class to run all Hive Phoenix integration tests against a MINI Map-Reduce cluster.
+ * Test methods only. All supporting methods should be placed to BaseHivePhoenixStoreIT
*/
-@Category(NeedsOwnMiniClusterTest.class)
-public class HivePhoenixStoreIT {
- private static final Log LOG = LogFactory.getLog(HivePhoenixStoreIT.class);
- private static HBaseTestingUtility hbaseTestUtil;
- private static String zkQuorum;
- private static Connection conn;
- private static Configuration conf;
- private static HiveTestUtil qt;
- private static String hiveOutputDir;
- private static String hiveLogDir;
-
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
- if (null != hadoopConfDir && !hadoopConfDir.isEmpty()) {
- LOG.warn("WARNING: HADOOP_CONF_DIR is set in the environment which may cause "
- + "issues with test execution via MiniDFSCluster");
- }
- hbaseTestUtil = new HBaseTestingUtility();
- conf = hbaseTestUtil.getConfiguration();
- setUpConfigForMiniCluster(conf);
- conf.set(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
- hiveOutputDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_output").toString();
- File outputDir = new File(hiveOutputDir);
- outputDir.mkdirs();
- hiveLogDir = new Path(hbaseTestUtil.getDataTestDir(), "hive_log").toString();
- File logDir = new File(hiveLogDir);
- logDir.mkdirs();
- // Setup Hive mini Server
- Path testRoot = hbaseTestUtil.getDataTestDir();
- System.setProperty("test.tmp.dir", testRoot.toString());
- System.setProperty("test.warehouse.dir", (new Path(testRoot, "warehouse")).toString());
-
- HiveTestUtil.MiniClusterType miniMR = HiveTestUtil.MiniClusterType.mr;
- try {
- qt = new HiveTestUtil(hiveOutputDir, hiveLogDir, miniMR, null);
- } catch (Exception e) {
- LOG.error("Unexpected exception in setup", e);
- fail("Unexpected exception in setup");
- }
-
- //Start HBase cluster
- hbaseTestUtil.startMiniCluster(3);
- MiniDFSCluster x = hbaseTestUtil.getDFSCluster();
-
- Class.forName(PhoenixDriver.class.getName());
- zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
- Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
- props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
- conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL +
- PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum, props);
- // Setup Hive Output Folder
-
- Statement stmt = conn.createStatement();
- stmt.execute("create table t(a integer primary key,b varchar)");
- }
+@Ignore("This class contains only test methods and should not be executed directly")
+public class HivePhoenixStoreIT extends BaseHivePhoenixStoreIT {
/**
* Create a table with two column, insert 1 row, check that phoenix table is created and
@@ -120,7 +43,6 @@ public class HivePhoenixStoreIT {
@Test
public void simpleTest() throws Exception {
String testName = "simpleTest";
- // create a dummy outfile under log folder
hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString());
@@ -129,9 +51,11 @@ public class HivePhoenixStoreIT {
" STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
.CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF +
" 'phoenix.table.name'='phoenix_table'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster()
- .getClientPort() + "', 'phoenix.rowkeys'='id');");
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id');");
sb.append("INSERT INTO TABLE phoenix_table" + HiveTestUtil.CRLF +
"VALUES ('10', '1000');" + HiveTestUtil.CRLF);
String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
@@ -145,9 +69,48 @@ public class HivePhoenixStoreIT {
assertTrue(rs.next());
assert (rs.getString(1).equals("10"));
assert (rs.getString(2).equals("1000"));
+ }
+
+ /**
+ * Create hive table with custom column mapping
+ * @throws Exception
+ */
+
+ @Test
+ public void simpleColumnMapTest() throws Exception {
+ String testName = "cmTest";
+ hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
+ createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
+ createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString());
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE column_table(ID STRING, P1 STRING, p2 STRING)" + HiveTestUtil.CRLF +
+ " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+ .CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+ " 'phoenix.table.name'='column_table'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.column.mapping' = 'id:C1, p1:c2, p2:C3'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id');");
+ sb.append("INSERT INTO TABLE column_table" + HiveTestUtil.CRLF +
+ "VALUES ('1', '2', '3');" + HiveTestUtil.CRLF);
+ String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
+ createFile(sb.toString(), fullPath);
+ runTest(testName, fullPath);
+
+ String phoenixQuery = "SELECT C1, \"c2\", C3 FROM column_table";
+ PreparedStatement statement = conn.prepareStatement(phoenixQuery);
+ ResultSet rs = statement.executeQuery();
+ assert (rs.getMetaData().getColumnCount() == 3);
+ assertTrue(rs.next());
+ assert (rs.getString(1).equals("1"));
+ assert (rs.getString(2).equals("2"));
+ assert (rs.getString(3).equals("3"));
}
+
/**
* Datatype Test
*
@@ -156,22 +119,22 @@ public class HivePhoenixStoreIT {
@Test
public void dataTypeTest() throws Exception {
String testName = "dataTypeTest";
- // create a dummy outfile under log folder
hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString());
StringBuilder sb = new StringBuilder();
- sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, ts TIMESTAMP, db " +
+ sb.append("CREATE TABLE phoenix_datatype(ID int, description STRING, ts TIMESTAMP, db " +
"DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
" STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
.CRLF + " TBLPROPERTIES(" + HiveTestUtil.CRLF +
" 'phoenix.hbase.table.name'='phoenix_datatype'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster()
- .getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
" 'phoenix.rowkeys'='id');");
sb.append("INSERT INTO TABLE phoenix_datatype" + HiveTestUtil.CRLF +
- "VALUES (10, \"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" + HiveTestUtil.CRLF);
+ "VALUES (10, \"foodesc\", \"2013-01-05 01:01:01\", 200,2.0,-1);" + HiveTestUtil.CRLF);
String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
createFile(sb.toString(), fullPath);
runTest(testName, fullPath);
@@ -183,10 +146,6 @@ public class HivePhoenixStoreIT {
while (rs.next()) {
assert (rs.getInt(1) == 10);
assert (rs.getString(2).equalsIgnoreCase("foodesc"));
- /* Need a way how to correctly handle timestamp since Hive's implementation uses
- time zone information but Phoenix doesn't.
- */
- //assert(rs.getTimestamp(3).equals(Timestamp.valueOf("2013-01-05 02:01:01")));
assert (rs.getDouble(4) == 200);
assert (rs.getFloat(5) == 2.0);
assert (rs.getInt(6) == -1);
@@ -201,23 +160,22 @@ public class HivePhoenixStoreIT {
@Test
public void MultiKey() throws Exception {
String testName = "MultiKey";
- // create a dummy outfile under log folder
hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
createFile(StringUtil.EMPTY_STRING, new Path(hiveOutputDir, testName + ".out").toString());
StringBuilder sb = new StringBuilder();
- sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 String,description STRING, ts " +
- "TIMESTAMP, db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+ sb.append("CREATE TABLE phoenix_MultiKey(ID int, ID2 String,description STRING," +
+ "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
" STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
.CRLF +
" TBLPROPERTIES(" + HiveTestUtil.CRLF +
" 'phoenix.hbase.table.name'='phoenix_MultiKey'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.znode.parent'='hbase'," + HiveTestUtil.CRLF +
- " 'phoenix.zookeeper.quorum'='localhost:" + hbaseTestUtil.getZkCluster()
- .getClientPort() + "'," + HiveTestUtil.CRLF +
- " 'phoenix.rowkeys'='id,id2');");
- sb.append("INSERT INTO TABLE phoenix_MultiKey" + HiveTestUtil.CRLF +
- "VALUES (10, \"part2\",\"foodesc\",\"2013-01-05 01:01:01\",200,2.0,-1);" +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF);
+ sb.append("INSERT INTO TABLE phoenix_MultiKey VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" +
HiveTestUtil.CRLF);
String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
createFile(sb.toString(), fullPath);
@@ -226,78 +184,128 @@ public class HivePhoenixStoreIT {
String phoenixQuery = "SELECT * FROM phoenix_MultiKey";
PreparedStatement statement = conn.prepareStatement(phoenixQuery);
ResultSet rs = statement.executeQuery();
- assert (rs.getMetaData().getColumnCount() == 7);
+ assert (rs.getMetaData().getColumnCount() == 6);
while (rs.next()) {
assert (rs.getInt(1) == 10);
assert (rs.getString(2).equalsIgnoreCase("part2"));
assert (rs.getString(3).equalsIgnoreCase("foodesc"));
- //assert(rs.getTimestamp(4).equals(Timestamp.valueOf("2013-01-05 02:01:01")));
- assert (rs.getDouble(5) == 200);
- assert (rs.getFloat(6) == 2.0);
- assert (rs.getInt(7) == -1);
+ assert (rs.getDouble(4) == 200);
+ assert (rs.getFloat(5) == 2.0);
+ assert (rs.getInt(6) == -1);
}
}
+ /**
+ * Test that hive is able to access Phoenix data during MR job (creating two tables and perform join on it)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testJoinNoColumnMaps() throws Exception {
+ String testName = "testJoin";
+ hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
+ createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
+ createFile("10\tpart2\tfoodesc\t200.0\t2.0\t-1\t10\tpart2\tfoodesc\t200.0\t2.0\t-1\n",
+ new Path(hiveOutputDir, testName + ".out").toString());
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE joinTable1(ID int, ID2 String,description STRING," +
+ "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+ " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+ .CRLF +
+ " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+ " 'phoenix.hbase.table.name'='joinTable1'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF);
+ sb.append("CREATE TABLE joinTable2(ID int, ID2 String,description STRING," +
+ "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+ " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+ .CRLF +
+ " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+ " 'phoenix.hbase.table.name'='joinTable2'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF);
+
+ sb.append("INSERT INTO TABLE joinTable1 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
+ sb.append("INSERT INTO TABLE joinTable1 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
- private void runTest(String fname, String fpath) throws Exception {
- long startTime = System.currentTimeMillis();
- try {
- LOG.info("Begin query: " + fname);
- qt.addFile(fpath);
+ sb.append("INSERT INTO TABLE joinTable2 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
+ sb.append("INSERT INTO TABLE joinTable2 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
- if (qt.shouldBeSkipped(fname)) {
- LOG.info("Test " + fname + " skipped");
- return;
- }
+ sb.append("SELECT * from joinTable1 A join joinTable2 B on A.ID = B.ID WHERE A.ID=10;" +
+ HiveTestUtil.CRLF);
- qt.cliInit(fname);
- qt.clearTestSideEffects();
- int ecode = qt.executeClient(fname);
- if (ecode != 0) {
- qt.failed(ecode, fname, null);
- }
+ String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
+ createFile(sb.toString(), fullPath);
+ runTest(testName, fullPath);
+ }
+
+ /**
+ * Test that hive is able to access Phoenix data during MR job (creating two tables and perform join on it)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testJoinColumnMaps() throws Exception {
+ String testName = "testJoin";
+ hbaseTestUtil.getTestFileSystem().createNewFile(new Path(hiveLogDir, testName + ".out"));
+ createFile("10\t200.0\tpart2\n", new Path(hiveOutputDir, testName + ".out").toString());
+ createFile(StringUtil.EMPTY_STRING, new Path(hiveLogDir, testName + ".out").toString());
- ecode = qt.checkCliDriverResults(fname);
- if (ecode != 0) {
- qt.failedDiff(ecode, fname, null);
- }
- qt.clearPostTestEffects();
+ StringBuilder sb = new StringBuilder();
+ sb.append("CREATE TABLE joinTable3(ID int, ID2 String,description STRING," +
+ "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+ " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+ .CRLF +
+ " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+ " 'phoenix.hbase.table.name'='joinTable3'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.column.mapping' = 'id:i1, id2:I2'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF);
+ sb.append("CREATE TABLE joinTable4(ID int, ID2 String,description STRING," +
+ "db DOUBLE,fl FLOAT, us INT)" + HiveTestUtil.CRLF +
+ " STORED BY \"org.apache.phoenix.hive.PhoenixStorageHandler\"" + HiveTestUtil
+ .CRLF +
+ " TBLPROPERTIES(" + HiveTestUtil.CRLF +
+ " 'phoenix.hbase.table.name'='joinTable4'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.znode.parent'='/hbase'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.quorum'='localhost'," + HiveTestUtil.CRLF +
+ " 'phoenix.zookeeper.client.port'='" +
+ hbaseTestUtil.getZkCluster().getClientPort() + "'," + HiveTestUtil.CRLF +
+ " 'phoenix.column.mapping' = 'id:i1, id2:I2'," + HiveTestUtil.CRLF +
+ " 'phoenix.rowkeys'='id,id2');" + HiveTestUtil.CRLF);
- } catch (Throwable e) {
- qt.failed(e, fname, null);
- }
+ sb.append("INSERT INTO TABLE joinTable3 VALUES (5, \"part1\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
+ sb.append("INSERT INTO TABLE joinTable3 VALUES (10, \"part1\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
- long elapsedTime = System.currentTimeMillis() - startTime;
- LOG.info("Done query: " + fname + " elapsedTime=" + elapsedTime / 1000 + "s");
- assertTrue("Test passed", true);
- }
+ sb.append("INSERT INTO TABLE joinTable4 VALUES (5, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
+ sb.append("INSERT INTO TABLE joinTable4 VALUES (10, \"part2\",\"foodesc\",200,2.0,-1);" + HiveTestUtil.CRLF);
- private void createFile(String content, String fullName) throws IOException {
- FileUtils.write(new File(fullName), content);
- }
+ sb.append("SELECT A.ID, a.db, B.ID2 from joinTable3 A join joinTable4 B on A.ID = B.ID WHERE A.ID=10;" +
+ HiveTestUtil.CRLF);
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- if (qt != null) {
- try {
- qt.shutdown();
- } catch (Exception e) {
- LOG.error("Unexpected exception in setup", e);
- fail("Unexpected exception in tearDown");
- }
- }
- try {
- conn.close();
- } finally {
- try {
- PhoenixDriver.INSTANCE.close();
- } finally {
- try {
- DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
- } finally {
- hbaseTestUtil.shutdownMiniCluster();
- }
- }
+ String fullPath = new Path(hbaseTestUtil.getDataTestDir(), testName).toString();
+ createFile(sb.toString(), fullPath);
+ runTest(testName, fullPath);
+ //Test that Phoenix has correctly mapped columns. We are checking both, primary key and
+ // regular columns mapped and not mapped
+ String phoenixQuery = "SELECT \"i1\", \"I2\", \"db\" FROM joinTable3 where \"i1\" = 10 AND \"I2\" = 'part1' AND \"db\" = 200";
+ PreparedStatement statement = conn.prepareStatement(phoenixQuery);
+ ResultSet rs = statement.executeQuery();
+ assert (rs.getMetaData().getColumnCount() == 3);
+ while (rs.next()) {
+ assert (rs.getInt(1) == 10);
+ assert (rs.getString(2).equalsIgnoreCase("part1"));
+ assert (rs.getDouble(3) == 200);
}
+
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
index 3407ffb..f5823ea 100644
--- a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTestUtil.java
@@ -691,6 +691,7 @@ public class HiveTestUtil {
}
public int executeClient(String tname) {
+ conf.set("mapreduce.job.name", "test");
return cliDriver.processLine(getCommands(tname), false);
}
@@ -1110,27 +1111,6 @@ public class HiveTestUtil {
}
/**
- * Setup to execute a set of query files. Uses HiveTestUtil to do so.
- *
- * @param qfiles array of input query files containing arbitrary number of hive
- * queries
- * @param resDir output directory
- * @param logDir log directory
- * @return one HiveTestUtil for each query file
- */
- public static HiveTestUtil[] queryListRunnerSetup(File[] qfiles, String resDir,
- String logDir) throws Exception {
- HiveTestUtil[] qt = new HiveTestUtil[qfiles.length];
- for (int i = 0; i < qfiles.length; i++) {
- qt[i] = new HiveTestUtil(resDir, logDir, MiniClusterType.mr, null, "0.20");
- qt[i].addFile(qfiles[i]);
- qt[i].clearTestSideEffects();
- }
-
- return qt;
- }
-
- /**
* Executes a set of query files in sequence.
*
* @param qfiles array of input query files containing arbitrary number of hive
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java
new file mode 100644
index 0000000..a675a0e
--- /dev/null
+++ b/phoenix-hive/src/it/java/org/apache/phoenix/hive/HiveTezIT.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hive;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HiveTezIT extends HivePhoenixStoreIT {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setup(HiveTestUtil.MiniClusterType.tez);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
index ae3675f..c35634a 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixMetaHook.java
@@ -35,9 +35,12 @@ import org.apache.phoenix.hive.util.PhoenixUtil;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.phoenix.hive.util.ColumnMappingUtils.getColumnMappingMap;
+
/**
* Implementation for notification methods which are invoked as part of transactions against the
* hive metastore,allowing Phoenix metadata to be kept in sync with Hive'smetastore.
@@ -105,6 +108,10 @@ public class PhoenixMetaHook implements HiveMetaHook {
String rowKeyName = getRowKeyMapping(fieldName, phoenixRowKeyList);
if (rowKeyName != null) {
+ String columnName = columnMappingMap.get(fieldName);
+ if(columnName != null) {
+ rowKeyName = columnName;
+ }
// In case of RowKey
if ("binary".equals(columnType)) {
// Phoenix must define max length of binary when type definition. Obtaining
@@ -115,9 +122,9 @@ public class PhoenixMetaHook implements HiveMetaHook {
rowKeyName = tokenList.get(0);
}
- ddl.append(" ").append(rowKeyName).append(" ").append(columnType).append(" not " +
+ ddl.append(" ").append("\"").append(rowKeyName).append("\"").append(" ").append(columnType).append(" not " +
"null,\n");
- realRowKeys.append(rowKeyName).append(",");
+ realRowKeys.append("\"").append(rowKeyName).append("\",");
} else {
// In case of Column
String columnName = columnMappingMap.get(fieldName);
@@ -136,7 +143,7 @@ public class PhoenixMetaHook implements HiveMetaHook {
columnName = tokenList.get(0);
}
- ddl.append(" ").append(columnName).append(" ").append(columnType).append(",\n");
+ ddl.append(" ").append("\"").append(columnName).append("\"").append(" ").append(columnType).append(",\n");
}
}
ddl.append(" ").append("constraint pk_").append(PhoenixUtil.getTableSchema(tableName.toUpperCase())[1]).append(" primary key(")
@@ -173,30 +180,6 @@ public class PhoenixMetaHook implements HiveMetaHook {
return rowKeyMapping;
}
- private Map<String, String> getColumnMappingMap(String columnMappings) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Column mappings : " + columnMappings);
- }
-
- if (columnMappings == null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("phoenix.column.mapping not set. using field definition");
- }
-
- return Collections.emptyMap();
- }
-
- Map<String, String> columnMappingMap = Splitter.on(PhoenixStorageHandlerConstants.COMMA)
- .trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split
- (columnMappings);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Column mapping map : " + columnMappingMap);
- }
-
- return columnMappingMap;
- }
-
@Override
public void rollbackCreateTable(Table table) throws MetaException {
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
index dd38cfb..9ef0158 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerDe.java
@@ -84,14 +84,7 @@ public class PhoenixSerDe extends AbstractSerDe {
}
serializer = new PhoenixSerializer(conf, tbl);
- row = new PhoenixRow(Lists.transform(serdeParams.getColumnNames(), new Function<String,
- String>() {
-
- @Override
- public String apply(String input) {
- return input.toUpperCase();
- }
- }));
+ row = new PhoenixRow(serdeParams.getColumnNames());
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
index e43ed0e..852407a 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixSerializer.java
@@ -63,6 +63,10 @@ public class PhoenixSerializer {
private PhoenixResultWritable pResultWritable;
public PhoenixSerializer(Configuration config, Properties tbl) throws SerDeException {
+ String mapping = tbl.getProperty(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, null);
+ if(mapping!=null ) {
+ config.set(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, mapping);
+ }
try (Connection conn = PhoenixConnectionUtil.getInputConnection(config, tbl)) {
List<ColumnInfo> columnMetadata = PhoenixUtil.getColumnInfoList(conn, tbl.getProperty
(PhoenixStorageHandlerConstants.PHOENIX_TABLE_NAME));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
index a425b7c..ae8f242 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/PhoenixStorageHandler.java
@@ -194,6 +194,11 @@ public class PhoenixStorageHandler extends DefaultStorageHandler implements
jobProperties.put(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT, tableProperties
.getProperty(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT,
PhoenixStorageHandlerConstants.DEFAULT_ZOOKEEPER_PARENT));
+ String columnMapping = tableProperties
+ .getProperty(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING);
+ if(columnMapping != null) {
+ jobProperties.put(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING, columnMapping);
+ }
jobProperties.put(hive_metastoreConstants.META_TABLE_STORAGE, this.getClass().getName());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index 9ebc3d6..f0a5dd6 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -91,7 +91,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
String query;
String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname,
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
-
if (LOG.isDebugEnabled()) {
LOG.debug("Target table name at split phase : " + tableName + "with whereCondition :" +
jobConf.get(TableScanDesc.FILTER_TEXT_CONF_STR) +
@@ -151,7 +150,7 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri
setScanCacheSize(jobConf);
// Adding Localization
- HConnection connection = HConnectionManager.createConnection(jobConf);
+ HConnection connection = HConnectionManager.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf));
RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
.getTableRef().getTable().getPhysicalName().toString()));
RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
index 5cdf234..ca27686 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixRecordReader.java
@@ -71,6 +71,7 @@ public class PhoenixRecordReader<T extends DBWritable> implements
private PhoenixResultSet resultSet;
private long readCount;
+
private boolean isTransactional;
public PhoenixRecordReader(Class<T> inputClass, final Configuration configuration, final
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
index 18ded89..2bdc7b2 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixResultWritable.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.hive.PhoenixRowKey;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.util.ColumnMappingUtils;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.hive.util.PhoenixUtil;
import org.apache.phoenix.util.ColumnInfo;
@@ -52,6 +53,7 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable
private List<ColumnInfo> columnMetadataList;
private List<Object> valueList; // for output
private Map<String, Object> rowMap = Maps.newHashMap(); // for input
+ private Map<String, String> columnMap;
private int columnCount = -1;
@@ -71,7 +73,6 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable
throws IOException {
this(config);
this.columnMetadataList = columnMetadataList;
-
valueList = Lists.newArrayListWithExpectedSize(columnMetadataList.size());
}
@@ -158,8 +159,12 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable
for (int i = 0; i < columnCount; i++) {
Object value = resultSet.getObject(i + 1);
-
- rowMap.put(rsmd.getColumnName(i + 1), value);
+ String columnName = rsmd.getColumnName(i + 1);
+ String mapName = columnMap.get(columnName);
+ if(mapName != null) {
+ columnName = mapName;
+ }
+ rowMap.put(columnName, value);
}
// Adding row__id column.
@@ -195,6 +200,7 @@ public class PhoenixResultWritable implements Writable, DBWritable, Configurable
@Override
public void setConf(Configuration conf) {
config = conf;
+ this.columnMap = ColumnMappingUtils.getReverseColumnMapping(config.get(PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING,""));
isTransactional = PhoenixStorageHandlerUtil.isTransactionalTable(config);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java
index ebc5fc0..210a377 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/query/PhoenixQueryBuilder.java
@@ -42,10 +42,13 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.mapred.JobConf;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
+import org.apache.phoenix.hive.util.ColumnMappingUtils;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.hive.util.PhoenixUtil;
import org.apache.phoenix.util.StringUtil;
+import static org.apache.phoenix.hive.util.ColumnMappingUtils.getColumnMappingMap;
+
/**
* Query builder. Produces a query depending on the colummn list and conditions
*/
@@ -91,13 +94,16 @@ public class PhoenixQueryBuilder {
TypeInfo> columnTypeMap) throws IOException {
StringBuilder sql = new StringBuilder();
List<String> conditionColumnList = buildWhereClause(jobConf, sql, whereClause, columnTypeMap);
+ readColumnList = replaceColumns(jobConf, readColumnList);
if (conditionColumnList.size() > 0) {
addConditionColumnToReadColumn(readColumnList, conditionColumnList);
+ readColumnList = ColumnMappingUtils.quoteColumns(readColumnList);
sql.insert(0, queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$",
getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$",
tableName));
} else {
+ readColumnList = ColumnMappingUtils.quoteColumns(readColumnList);
sql.append(queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$",
getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$",
tableName));
@@ -110,18 +116,46 @@ public class PhoenixQueryBuilder {
return sql.toString();
}
+ private static String findReplacement(JobConf jobConf, String column) {
+ Map<String, String> columnMappingMap = getColumnMappingMap(jobConf.get
+ (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING));
+ if (columnMappingMap != null && columnMappingMap.containsKey(column)) {
+ return columnMappingMap.get(column);
+ } else {
+ return column;
+ }
+ }
+ private static List<String> replaceColumns(JobConf jobConf, List<String> columnList) {
+ Map<String, String> columnMappingMap = getColumnMappingMap(jobConf.get
+ (PhoenixStorageHandlerConstants.PHOENIX_COLUMN_MAPPING));
+ if(columnMappingMap != null) {
+ List<String> newList = Lists.newArrayList();
+ for(String column:columnList) {
+ if(columnMappingMap.containsKey(column)) {
+ newList.add(columnMappingMap.get(column));
+ } else {
+ newList.add(column);
+ }
+ }
+ return newList;
+ }
+ return null;
+ }
+
private String makeQueryString(JobConf jobConf, String tableName, List<String>
readColumnList, List<IndexSearchCondition> searchConditions, String queryTemplate,
String hints) throws IOException {
StringBuilder query = new StringBuilder();
- List<String> conditionColumnList = buildWhereClause(query, searchConditions);
+ List<String> conditionColumnList = buildWhereClause(jobConf, query, searchConditions);
if (conditionColumnList.size() > 0) {
+ readColumnList = replaceColumns(jobConf, readColumnList);
addConditionColumnToReadColumn(readColumnList, conditionColumnList);
query.insert(0, queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$",
getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$",
tableName));
} else {
+ readColumnList = replaceColumns(jobConf, readColumnList);
query.append(queryTemplate.replace("$HINT$", hints).replace("$COLUMN_LIST$",
getSelectColumns(jobConf, tableName, readColumnList)).replace("$TABLE_NAME$",
tableName));
@@ -136,7 +170,7 @@ public class PhoenixQueryBuilder {
private String getSelectColumns(JobConf jobConf, String tableName, List<String>
readColumnList) throws IOException {
- String selectColumns = Joiner.on(PhoenixStorageHandlerConstants.COMMA).join(readColumnList);
+ String selectColumns = Joiner.on(PhoenixStorageHandlerConstants.COMMA).join(ColumnMappingUtils.quoteColumns(readColumnList));
if (PhoenixStorageHandlerConstants.EMPTY_STRING.equals(selectColumns)) {
selectColumns = "*";
@@ -146,10 +180,8 @@ public class PhoenixQueryBuilder {
StringBuilder pkColumns = new StringBuilder();
for (String pkColumn : pkColumnList) {
- String pkColumnName = pkColumn.toLowerCase();
-
- if (!readColumnList.contains(pkColumnName)) {
- pkColumns.append(pkColumnName).append(PhoenixStorageHandlerConstants.COMMA);
+ if (!readColumnList.contains(pkColumn)) {
+ pkColumns.append("\"").append(pkColumn).append("\"" + PhoenixStorageHandlerConstants.COMMA);
}
}
@@ -218,7 +250,10 @@ public class PhoenixQueryBuilder {
for (String columnName : columnTypeMap.keySet()) {
if (whereClause.contains(columnName)) {
- conditionColumnList.add(columnName);
+ String column = findReplacement(jobConf, columnName);
+ whereClause = StringUtils.replaceEach(whereClause, new String[] {columnName}, new String[] {"\""+column + "\""});
+ conditionColumnList.add(column);
+
if (PhoenixStorageHandlerConstants.DATE_TYPE.equals(
columnTypeMap.get(columnName).getTypeName())) {
@@ -617,7 +652,7 @@ public class PhoenixQueryBuilder {
return itsMine;
}
- protected List<String> buildWhereClause(StringBuilder sql,
+ protected List<String> buildWhereClause(JobConf jobConf, StringBuilder sql,
List<IndexSearchCondition> conditions)
throws IOException {
if (conditions == null || conditions.size() == 0) {
@@ -628,21 +663,27 @@ public class PhoenixQueryBuilder {
sql.append(" where ");
Iterator<IndexSearchCondition> iter = conditions.iterator();
- appendExpression(sql, iter.next(), columns);
+ appendExpression(jobConf, sql, iter.next(), columns);
while (iter.hasNext()) {
sql.append(" and ");
- appendExpression(sql, iter.next(), columns);
+ appendExpression(jobConf, sql, iter.next(), columns);
}
return columns;
}
- private void appendExpression(StringBuilder sql, IndexSearchCondition condition,
+ private void appendExpression(JobConf jobConf, StringBuilder sql, IndexSearchCondition condition,
List<String> columns) {
Expression expr = findExpression(condition);
if (expr != null) {
- sql.append(expr.buildExpressionStringFrom(condition));
- columns.add(condition.getColumnDesc().getColumn());
+ sql.append(expr.buildExpressionStringFrom(jobConf, condition));
+ String column = condition.getColumnDesc().getColumn();
+ String rColumn = findReplacement(jobConf, column);
+ if(rColumn != null) {
+ column = rColumn;
+ }
+
+ columns.add(column);
}
}
@@ -719,10 +760,15 @@ public class PhoenixQueryBuilder {
return condition.getComparisonOp().endsWith(hiveCompOp) && checkCondition(condition);
}
- public String buildExpressionStringFrom(IndexSearchCondition condition) {
+ public String buildExpressionStringFrom(JobConf jobConf, IndexSearchCondition condition) {
final String type = condition.getColumnDesc().getTypeString();
+ String column = condition.getColumnDesc().getColumn();
+ String rColumn = findReplacement(jobConf, column);
+ if(rColumn != null) {
+ column = rColumn;
+ }
return JOINER_SPACE.join(
- condition.getColumnDesc().getColumn(),
+ "\"" + column + "\"",
getSqlCompOpString(condition),
joiner != null ? createConstants(type, condition.getConstantDescs()) :
createConstant(type, condition.getConstantDesc()));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java
new file mode 100644
index 0000000..f348c0f
--- /dev/null
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/ColumnMappingUtils.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hive.util;
+
+import com.google.common.base.Splitter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+
+import java.util.*;
+
+
+/**
+ * Util class for mapping between Hive and Phoenix column names
+ */
+public class ColumnMappingUtils {
+
+ private static final Log LOG = LogFactory.getLog(ColumnMappingUtils.class);
+
+ public static Map<String, String> getColumnMappingMap(String columnMappings) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column mappings : " + columnMappings);
+ }
+
+ if (columnMappings == null || columnMappings.length() == 0) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("phoenix.column.mapping not set. using field definition");
+ }
+
+ return Collections.emptyMap();
+ }
+
+ Map<String, String> columnMappingMap = Splitter.on(PhoenixStorageHandlerConstants.COMMA)
+ .trimResults().withKeyValueSeparator(PhoenixStorageHandlerConstants.COLON).split
+ (columnMappings);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Column mapping map : " + columnMappingMap);
+ }
+
+ return columnMappingMap;
+ }
+
+ public static Map<String, String> getReverseColumnMapping(String columnMapping) {
+ Map<String, String> myNewHashMap = new LinkedHashMap<>();
+ Map<String, String> forward = getColumnMappingMap(columnMapping);
+ for(Map.Entry<String, String> entry : forward.entrySet()){
+ myNewHashMap.put(entry.getValue(), entry.getKey());
+ }
+ return myNewHashMap;
+ }
+
+ public static List<String> quoteColumns(List<String> readColumnList) {
+ List<String> newList = new LinkedList<>();
+ for(String column : readColumnList) {
+ newList.add("\""+ column + "\"");
+ }
+ return newList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java
index 51f6c7e..b32419a 100644
--- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java
+++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/util/PhoenixConnectionUtil.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.hive.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -94,4 +97,20 @@ public class PhoenixConnectionUtil {
clientPort, zNodeParent) : QueryUtil.getUrl(quorum), props);
}
+ public static Configuration getConfiguration(JobConf jobConf) {
+ Configuration conf = new Configuration(jobConf);
+ String quorum = conf.get(PhoenixStorageHandlerConstants.ZOOKEEPER_QUORUM);
+ if(quorum!=null) {
+ conf.set(HConstants.ZOOKEEPER_QUORUM, quorum);
+ }
+ int zooKeeperClientPort = conf.getInt(PhoenixStorageHandlerConstants.ZOOKEEPER_PORT, 0);
+ if(zooKeeperClientPort != 0) {
+ conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zooKeeperClientPort);
+ }
+ String zNodeParent = conf.get(PhoenixStorageHandlerConstants.ZOOKEEPER_PARENT);
+ if(zNodeParent != null) {
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zNodeParent);
+ }
+ return conf;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/7201dd5e/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java b/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java
index 1dc6e25..e4f872e 100644
--- a/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java
+++ b/phoenix-hive/src/test/java/org/apache/phoenix/hive/query/PhoenixQueryBuilderTest.java
@@ -76,8 +76,8 @@ public class PhoenixQueryBuilderTest {
public void testBuildQueryWithCharColumns() throws IOException {
final String COLUMN_CHAR = "Column_Char";
final String COLUMN_VARCHAR = "Column_VChar";
- final String expectedQueryPrefix = "select /*+ NO_CACHE */ " + COLUMN_CHAR + "," + COLUMN_VARCHAR +
- " from TEST_TABLE where ";
+ final String expectedQueryPrefix = "select /*+ NO_CACHE */ \"" + COLUMN_CHAR + "\",\"" + COLUMN_VARCHAR +
+ "\" from TEST_TABLE where ";
JobConf jobConf = new JobConf();
List<String> readColumnList = Lists.newArrayList(COLUMN_CHAR, COLUMN_VARCHAR);
@@ -86,7 +86,7 @@ public class PhoenixQueryBuilderTest {
mockedIndexSearchCondition("GenericUDFOPEqual", "CHAR_VALUE2", null, COLUMN_VARCHAR, "varchar(10)", false)
);
- assertEquals(expectedQueryPrefix + "Column_Char = 'CHAR_VALUE' and Column_VChar = 'CHAR_VALUE2'",
+ assertEquals(expectedQueryPrefix + "\"Column_Char\" = 'CHAR_VALUE' and \"Column_VChar\" = 'CHAR_VALUE2'",
BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions));
searchConditions = Lists.newArrayList(
@@ -94,7 +94,7 @@ public class PhoenixQueryBuilderTest {
new Object[]{"CHAR1", "CHAR2", "CHAR3"}, COLUMN_CHAR, "char(10)", false)
);
- assertEquals(expectedQueryPrefix + "Column_Char in ('CHAR1', 'CHAR2', 'CHAR3')",
+ assertEquals(expectedQueryPrefix + "\"Column_Char\" in ('CHAR1', 'CHAR2', 'CHAR3')",
BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions));
searchConditions = Lists.newArrayList(
@@ -110,7 +110,7 @@ public class PhoenixQueryBuilderTest {
new Object[]{"CHAR1", "CHAR2"}, COLUMN_CHAR, "char(10)", false)
);
- assertEquals(expectedQueryPrefix + "Column_Char between 'CHAR1' and 'CHAR2'",
+ assertEquals(expectedQueryPrefix + "\"Column_Char\" between 'CHAR1' and 'CHAR2'",
BUILDER.buildQuery(jobConf, TABLE_NAME, readColumnList, searchConditions));
searchConditions = Lists.newArrayList(