You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/09/21 19:28:40 UTC
[2/2] git commit: Phoenix 1050 : Support for DataByteArray
Phoenix 1050 : Support for DataByteArray
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2ccb62d1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2ccb62d1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2ccb62d1
Branch: refs/heads/4.0
Commit: 2ccb62d181afb3e54fe4940f78490f858b9a710a
Parents: 27b3865
Author: mravi <ma...@gmail.com>
Authored: Sun Sep 21 10:27:51 2014 -0700
Committer: mravi <ma...@gmail.com>
Committed: Sun Sep 21 10:27:51 2014 -0700
----------------------------------------------------------------------
.../phoenix/pig/PhoenixHBaseStorerIT.java | 115 +++++++++++++++++++
.../org/apache/phoenix/pig/util/TypeUtil.java | 48 ++++----
2 files changed, 139 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ccb62d1/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
index 1d82362..e0021d9 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.TestUtil.LOCALHOST;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertArrayEquals;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -30,6 +31,7 @@ import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Collection;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.end2end.HBaseManagedTimeTest;
import org.apache.pig.ExecType;
@@ -37,8 +39,10 @@ import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.joda.time.DateTime;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -182,4 +186,115 @@ public class PhoenixHBaseStorerIT extends BaseHBaseManagedTimeIT {
assertEquals(0, rs.getInt(3));
}
}
+
+ /**
+ * Test storage of DataByteArray columns to Phoenix
+ * Maps the DataByteArray with the target PhoenixDataType and persists in HBase.
+ * @throws Exception
+ */
+ @Test
+ public void testStoreWithBinaryDataTypes() throws Exception {
+
+ final String tableName = "TABLE3";
+ final Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + tableName +
+ " (col1 BIGINT NOT NULL, col2 INTEGER , col3 FLOAT, col4 DOUBLE , col5 TINYINT , " +
+ " col6 BOOLEAN , col7 VARBINARY CONSTRAINT my_pk PRIMARY KEY (col1))");
+
+ final Data data = Storage.resetData(pigServer);
+ final Collection<Tuple> list = Lists.newArrayList();
+
+ int rows = 10;
+ for (int i = 1; i <= rows; i++) {
+ Tuple t = tupleFactory.newTuple();
+ t.append(i);
+ t.append(new DataByteArray(Bytes.toBytes(i * 5)));
+ t.append(new DataByteArray(Bytes.toBytes(i * 10.0F)));
+ t.append(new DataByteArray(Bytes.toBytes(i * 15.0D)));
+ t.append(new DataByteArray(Bytes.toBytes(i)));
+ t.append(new DataByteArray(Bytes.toBytes( i % 2 == 0)));
+ t.append(new DataByteArray(Bytes.toBytes(i)));
+ list.add(t);
+ }
+ data.set("in", "col1:int,col2:bytearray,col3:bytearray,col4:bytearray,col5:bytearray,col6:bytearray,col7:bytearray ", list);
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+
+ pigServer.registerQuery("Store A into 'hbase://" + tableName
+ + "' using " + PhoenixHBaseStorage.class.getName() + "('"
+ + zkQuorum + "', '-batchSize 1000');");
+
+ if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+ throw new RuntimeException("Job failed", pigServer.executeBatch()
+ .get(0).getException());
+ }
+
+ final ResultSet rs = stmt
+ .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 , col5 , col6, col7 FROM %s ORDER BY col1" , tableName));
+
+ int count = 0;
+ for (int i = 1; i <= rows; i++) {
+ assertTrue(rs.next());
+ assertEquals(i, rs.getInt(1));
+ assertEquals(i * 5, rs.getInt(2));
+ assertEquals(i * 10.0F, rs.getFloat(3),0.0);
+ assertEquals(i * 15.0D, rs.getInt(4),0.0);
+ assertEquals(i,rs.getInt(5));
+ assertEquals(i % 2 == 0, rs.getBoolean(6));
+ assertArrayEquals(Bytes.toBytes(i), rs.getBytes(7));
+ count++;
+ }
+ assertEquals(rows, count);
+ }
+
+ @Test
+ public void testStoreWithDateTime() throws Exception {
+
+ final String tableName = "TABLE4";
+ final Statement stmt = conn.createStatement();
+
+ stmt.execute("CREATE TABLE " + tableName +
+ " (col1 BIGINT NOT NULL, col2 DATE , col3 TIME, " +
+ " col4 TIMESTAMP CONSTRAINT my_pk PRIMARY KEY (col1))");
+
+ long now = System.currentTimeMillis();
+ final DateTime dt = new DateTime(now);
+
+ final Data data = Storage.resetData(pigServer);
+ final Collection<Tuple> list = Lists.newArrayList();
+ Tuple t = tupleFactory.newTuple();
+
+ t.append(1);
+ t.append(dt);
+ t.append(dt);
+ t.append(dt);
+
+ list.add(t);
+
+ data.set("in", "col1:int,col2:datetime,col3:datetime,col4:datetime", list);
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+
+ pigServer.registerQuery("Store A into 'hbase://" + tableName
+ + "' using " + PhoenixHBaseStorage.class.getName() + "('"
+ + zkQuorum + "', '-batchSize 1000');");
+
+ if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+ throw new RuntimeException("Job failed", pigServer.executeBatch()
+ .get(0).getException());
+ }
+
+ final ResultSet rs = stmt
+ .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 FROM %s " , tableName));
+
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals(now, rs.getDate(2).getTime());
+ assertEquals(now, rs.getTime(3).getTime());
+ assertEquals(now, rs.getTimestamp(4).getTime());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ccb62d1/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index f3cacfd..1cdd66d 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
import org.apache.phoenix.schema.PDataType;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -45,7 +46,7 @@ import com.google.common.collect.ImmutableMap.Builder;
public final class TypeUtil {
private static final Log LOG = LogFactory.getLog(TypeUtil.class);
- private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
+ private static final HBaseBinaryConverter binaryConverter = new HBaseBinaryConverter ();
private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();
private TypeUtil(){
@@ -97,7 +98,6 @@ public final class TypeUtil {
if (obj == null) {
return null;
}
-
PDataType sqlType;
switch (type) {
@@ -108,6 +108,7 @@ public final class TypeUtil {
sqlType = PDataType.VARCHAR;
break;
case DataType.DOUBLE:
+ case DataType.BIGDECIMAL:
sqlType = PDataType.DOUBLE;
break;
case DataType.FLOAT:
@@ -117,6 +118,7 @@ public final class TypeUtil {
sqlType = PDataType.INTEGER;
break;
case DataType.LONG:
+ case DataType.BIGINTEGER:
sqlType = PDataType.LONG;
break;
case DataType.BOOLEAN:
@@ -125,6 +127,9 @@ public final class TypeUtil {
case DataType.DATETIME:
sqlType = PDataType.DATE;
break;
+ case DataType.BYTE:
+ sqlType = PDataType.TINYINT;
+ break;
default:
throw new RuntimeException("Unknown type " + obj.getClass().getName()
+ " passed to PhoenixHBaseStorage");
@@ -150,16 +155,17 @@ public final class TypeUtil {
if(inferredPType == null) {
return null;
}
-
- if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
+
+ if(inferredPType == PDataType.VARBINARY) {
try {
o = castBytes(o, targetPhoenixType);
- inferredPType = getType(o, DataType.findType(o));
+ if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) {
+ inferredPType = getType(o, DataType.findType(o));
+ }
} catch (IOException e) {
throw new RuntimeException("Error while casting bytes for object " +o);
}
}
-
if(inferredPType == PDataType.DATE) {
int inferredSqlType = targetPhoenixType.getSqlType();
@@ -192,42 +198,36 @@ public final class TypeUtil {
* @return Object
* @throws IOException
*/
- public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+ private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
byte[] bytes = ((DataByteArray)o).get();
switch(targetPhoenixType) {
case CHAR:
case VARCHAR:
- return utf8Converter.bytesToCharArray(bytes);
+ return binaryConverter.bytesToCharArray(bytes);
case UNSIGNED_SMALLINT:
case SMALLINT:
- return utf8Converter.bytesToInteger(bytes).shortValue();
+ return binaryConverter.bytesToInteger(bytes).shortValue();
case UNSIGNED_TINYINT:
case TINYINT:
- return utf8Converter.bytesToInteger(bytes).byteValue();
+ return binaryConverter.bytesToInteger(bytes).byteValue();
case UNSIGNED_INT:
case INTEGER:
- return utf8Converter.bytesToInteger(bytes);
+ return binaryConverter.bytesToInteger(bytes);
case BOOLEAN:
- return utf8Converter.bytesToBoolean(bytes);
- case DECIMAL:
- return utf8Converter.bytesToBigDecimal(bytes);
+ return binaryConverter.bytesToBoolean(bytes);
case FLOAT:
case UNSIGNED_FLOAT:
- return utf8Converter.bytesToFloat(bytes);
+ return binaryConverter.bytesToFloat(bytes);
case DOUBLE:
case UNSIGNED_DOUBLE:
- return utf8Converter.bytesToDouble(bytes);
+ return binaryConverter.bytesToDouble(bytes);
case UNSIGNED_LONG:
case LONG:
- return utf8Converter.bytesToLong(bytes);
- case TIME:
- case TIMESTAMP:
- case DATE:
- case UNSIGNED_TIME:
- case UNSIGNED_TIMESTAMP:
- case UNSIGNED_DATE:
- return utf8Converter.bytesToDateTime(bytes);
+ return binaryConverter.bytesToLong(bytes);
+ case VARBINARY :
+ case BINARY:
+ return bytes;
default:
return o;
}