You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/04/21 18:28:23 UTC
[2/3] PHOENIX-11 Pig Loader (RaviMagham)
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
index 3b0551f..e2521c5 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
@@ -18,32 +18,40 @@
package org.apache.phoenix.pig;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+
/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage}
- *
- *
+ * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
*
*/
public class PhoenixPigConfiguration {
private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
+ private PhoenixPigConfigurationUtil util;
+
/**
* Speculative execution of Map tasks
*/
@@ -58,106 +66,232 @@ public class PhoenixPigConfiguration {
public static final String TABLE_NAME = "phoenix.hbase.table.name";
+ public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
+
public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+ public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
+
+ public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+
public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+ //columns projected given as part of LOAD.
+ public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+
+ public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
+
+ // the delimiter supported during LOAD and STORE when projected columns are given.
+ public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+
public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
- private final Configuration conf;
+ public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
- private Connection conn;
- private List<ColumnInfo> columnMetadataList;
+ private final Configuration conf;
public PhoenixPigConfiguration(Configuration conf) {
this.conf = conf;
+ this.util = new PhoenixPigConfigurationUtil();
}
public void configure(String server, String tableName, long batchSize) {
- conf.set(SERVER_NAME, server);
- conf.set(TABLE_NAME, tableName);
- conf.setLong(UPSERT_BATCH_SIZE, batchSize);
- conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
- conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
+ configure(server,tableName,batchSize,null);
+ }
+
+ public void configure(String server, String tableName, long batchSize, String columns) {
+ conf.set(SERVER_NAME, server);
+ conf.set(TABLE_NAME, tableName);
+ conf.setLong(UPSERT_BATCH_SIZE, batchSize);
+ if (isNotEmpty(columns)) {
+ conf.set(UPSERT_COLUMNS, columns);
+ }
+ conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
+ conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
}
+
/**
* Creates a {@link Connection} with autoCommit set to false.
* @throws SQLException
*/
public Connection getConnection() throws SQLException {
- Properties props = new Properties();
- conn = DriverManager.getConnection(QueryUtil.getUrl(this.conf.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
- conn.setAutoCommit(false);
-
- setup(conn);
-
- return conn;
+ return getUtil().getConnection(getConfiguration());
}
- /**
- * This method creates the Upsert statement and the Column Metadata
- * for the Pig query using {@link PhoenixHBaseStorage}. It also
- * determines the batch size based on user provided options.
- *
- * @param conn
- * @throws SQLException
- */
- public void setup(Connection conn) throws SQLException {
- // Reset batch size
- long batchSize = getBatchSize() <= 0 ? ((PhoenixConnection) conn).getMutateBatchSize() : getBatchSize();
- conf.setLong(UPSERT_BATCH_SIZE, batchSize);
-
- if (columnMetadataList == null) {
- columnMetadataList = new ArrayList<ColumnInfo>();
- String[] tableMetadata = getTableMetadata(getTableName());
- ResultSet rs = conn.getMetaData().getColumns(null, tableMetadata[0], tableMetadata[1], null);
- while (rs.next()) {
- columnMetadataList.add(new ColumnInfo(rs.getString(QueryUtil.COLUMN_NAME_POSITION), rs.getInt(QueryUtil.DATA_TYPE_POSITION)));
- }
- }
-
- // Generating UPSERT statement without column name information.
- String upsertStmt = QueryUtil.constructGenericUpsertStatement(getTableName(), columnMetadataList.size());
- LOG.info("Phoenix Upsert Statement: " + upsertStmt);
- conf.set(UPSERT_STATEMENT, upsertStmt);
- }
-
- public String getUpsertStatement() {
- return conf.get(UPSERT_STATEMENT);
+ public String getUpsertStatement() throws SQLException {
+ return getUtil().getUpsertStatement(getConfiguration(), getTableName());
}
- public long getBatchSize() {
- return conf.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+ public long getBatchSize() throws SQLException {
+ return getUtil().getBatchSize(getConfiguration());
}
-
public String getServer() {
return conf.get(SERVER_NAME);
}
- public List<ColumnInfo> getColumnMetadataList() {
- return columnMetadataList;
+ public List<ColumnInfo> getColumnMetadataList() throws SQLException {
+ return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
+ }
+
+ public String getUpsertColumns() {
+ return conf.get(UPSERT_COLUMNS);
}
public String getTableName() {
return conf.get(TABLE_NAME);
}
-
- private String[] getTableMetadata(String table) {
- String[] schemaAndTable = table.split("\\.");
- assert schemaAndTable.length >= 1;
-
- if (schemaAndTable.length == 1) {
- return new String[] { "", schemaAndTable[0] };
- }
-
- return new String[] { schemaAndTable[0], schemaAndTable[1] };
- }
-
public Configuration getConfiguration() {
return this.conf;
}
+
+ public String getSelectStatement() throws SQLException {
+ return getUtil().getSelectStatement(getConfiguration(), getTableName());
+ }
+
+ public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
+ return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
+ }
+
+ public void setServerName(final String zookeeperQuorum) {
+ this.conf.set(SERVER_NAME, zookeeperQuorum);
+ }
+
+ public void setTableName(final String tableName) {
+ Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
+ this.conf.set(TABLE_NAME, tableName);
+ }
+
+ public void setSelectStatement(final String selectStatement) {
+ this.conf.set(SELECT_STATEMENT, selectStatement);
+ }
+ public void setSelectColumns(String selectColumns) {
+ this.conf.set(SELECT_COLUMNS, selectColumns);
+ }
+
+ public PhoenixPigConfigurationUtil getUtil() {
+ return this.util;
+ }
+
+
+ @VisibleForTesting
+ static class PhoenixPigConfigurationUtil {
+
+ public Connection getConnection(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Properties props = new Properties();
+ final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
+ conn.setAutoCommit(false);
+ return conn;
+ }
+
+ public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final Connection connection = getConnection(configuration);
+ String upsertColumns = configuration.get(UPSERT_COLUMNS);
+ List<String> upsertColumnList = null;
+ if(isNotEmpty(upsertColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+ LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+ ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+ ));
+ }
+ List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+ closeConnection(connection);
+ return columnMetadataList;
+ }
+
+ public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ String upsertStmt = configuration.get(UPSERT_STATEMENT);
+ if(isNotEmpty(upsertStmt)) {
+ return upsertStmt;
+ }
+ final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+ final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
+ if (useUpsertColumns) {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+ LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+ } else {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+ LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+ }
+ configuration.set(UPSERT_STATEMENT, upsertStmt);
+ return upsertStmt;
+
+ }
+
+ public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final Connection connection = getConnection(configuration);
+ String selectColumns = configuration.get(SELECT_COLUMNS);
+ List<String> selectColumnList = null;
+ if(isNotEmpty(selectColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+ LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+ ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+ ));
+ }
+ List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+ closeConnection(connection);
+ return columnMetadataList;
+ }
+
+ public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ String selectStmt = configuration.get(SELECT_STATEMENT);
+ if(isNotEmpty(selectStmt)) {
+ return selectStmt;
+ }
+ final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
+ selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
+ LOG.info("Select Statement: "+ selectStmt);
+ configuration.set(SELECT_STATEMENT, selectStmt);
+ return selectStmt;
+ }
+
+ public long getBatchSize(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+ if(batchSize <= 0) {
+ Connection conn = getConnection(configuration);
+ batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+ closeConnection(conn);
+ }
+ configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+ return batchSize;
+ }
+
+ private void closeConnection(final Connection connection) throws SQLException {
+ if(connection != null) {
+ connection.close();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
deleted file mode 100644
index d6c1466..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/TypeUtil.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import java.io.IOException;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
-
-import org.apache.pig.builtin.Utf8StorageConverter;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.joda.time.DateTime;
-
-import org.apache.phoenix.schema.PDataType;
-
-public class TypeUtil {
-
- private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
-
- /**
- * This method returns the most appropriate PDataType associated with
- * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as
- * inferredSqlType.
- *
- * This is later used to make a cast to targetPhoenixType accordingly. See
- * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
- *
- * @param obj
- * @return PDataType
- */
- public static PDataType getType(Object obj, byte type) {
- if (obj == null) {
- return null;
- }
-
- PDataType sqlType;
-
- switch (type) {
- case DataType.BYTEARRAY:
- sqlType = PDataType.VARBINARY;
- break;
- case DataType.CHARARRAY:
- sqlType = PDataType.VARCHAR;
- break;
- case DataType.DOUBLE:
- sqlType = PDataType.DOUBLE;
- break;
- case DataType.FLOAT:
- sqlType = PDataType.FLOAT;
- break;
- case DataType.INTEGER:
- sqlType = PDataType.INTEGER;
- break;
- case DataType.LONG:
- sqlType = PDataType.LONG;
- break;
- case DataType.BOOLEAN:
- sqlType = PDataType.BOOLEAN;
- break;
- case DataType.DATETIME:
- sqlType = PDataType.DATE;
- break;
- default:
- throw new RuntimeException("Unknown type " + obj.getClass().getName()
- + " passed to PhoenixHBaseStorage");
- }
-
- return sqlType;
-
- }
-
- /**
- * This method encodes a value with Phoenix data type. It begins
- * with checking whether an object is BINARY and makes a call to
- * {@link #castBytes(Object, PDataType)} to convery bytes to
- * targetPhoenixType
- *
- * @param o
- * @param targetPhoenixType
- * @return Object
- */
- public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
- PDataType inferredPType = getType(o, objectType);
-
- if(inferredPType == null) {
- return null;
- }
-
- if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
- try {
- o = castBytes(o, targetPhoenixType);
- inferredPType = getType(o, DataType.findType(o));
- } catch (IOException e) {
- throw new RuntimeException("Error while casting bytes for object " +o);
- }
- }
-
- if(inferredPType == PDataType.DATE) {
- int inferredSqlType = targetPhoenixType.getSqlType();
-
- if(inferredSqlType == Types.DATE) {
- return new Date(((DateTime)o).getMillis());
- }
- if(inferredSqlType == Types.TIME) {
- return new Time(((DateTime)o).getMillis());
- }
- if(inferredSqlType == Types.TIMESTAMP) {
- return new Timestamp(((DateTime)o).getMillis());
- }
- }
-
- if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
- return inferredPType.toObject(o, targetPhoenixType);
- }
-
- throw new RuntimeException(o.getClass().getName()
- + " cannot be coerced to "+targetPhoenixType.toString());
- }
-
- /**
- * This method converts bytes to the target type required
- * for Phoenix. It uses {@link Utf8StorageConverter} for
- * the conversion.
- *
- * @param o
- * @param targetPhoenixType
- * @return Object
- * @throws IOException
- */
- public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
- byte[] bytes = ((DataByteArray)o).get();
-
- switch(targetPhoenixType) {
- case CHAR:
- case VARCHAR:
- return utf8Converter.bytesToCharArray(bytes);
- case UNSIGNED_SMALLINT:
- case SMALLINT:
- return utf8Converter.bytesToInteger(bytes).shortValue();
- case UNSIGNED_TINYINT:
- case TINYINT:
- return utf8Converter.bytesToInteger(bytes).byteValue();
- case UNSIGNED_INT:
- case INTEGER:
- return utf8Converter.bytesToInteger(bytes);
- case BOOLEAN:
- return utf8Converter.bytesToBoolean(bytes);
- case DECIMAL:
- return utf8Converter.bytesToBigDecimal(bytes);
- case FLOAT:
- case UNSIGNED_FLOAT:
- return utf8Converter.bytesToFloat(bytes);
- case DOUBLE:
- case UNSIGNED_DOUBLE:
- return utf8Converter.bytesToDouble(bytes);
- case UNSIGNED_LONG:
- case LONG:
- return utf8Converter.bytesToLong(bytes);
- case TIME:
- case TIMESTAMP:
- case DATE:
- case UNSIGNED_TIME:
- case UNSIGNED_TIMESTAMP:
- case UNSIGNED_DATE:
- return utf8Converter.bytesToDateTime(bytes);
- default:
- return o;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
new file mode 100644
index 0000000..ebb9023
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ScanUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * The InputFormat class for generating the splits and creating the record readers.
+ *
+ */
+public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+ private PhoenixPigConfiguration phoenixConfiguration;
+ private Connection connection;
+ private QueryPlan queryPlan;
+
+ /**
+ * instantiated by framework
+ */
+ public PhoenixInputFormat() {
+ }
+
+ @Override
+ public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ setConf(context.getConfiguration());
+ final QueryPlan queryPlan = getQueryPlan(context);
+ try {
+ return new PhoenixRecordReader(phoenixConfiguration,queryPlan);
+ }catch(SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
+
+
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ List<InputSplit> splits = null;
+ try{
+ setConf(context.getConfiguration());
+ final QueryPlan queryPlan = getQueryPlan(context);
+ @SuppressWarnings("unused")
+ final ResultIterator iterator = queryPlan.iterator();
+ final List<KeyRange> allSplits = queryPlan.getSplits();
+ splits = generateSplits(queryPlan,allSplits);
+ } catch(SQLException sqlE) {
+ LOG.error(String.format(" Error [%s] in getSplits of PhoenixInputFormat ", sqlE.getMessage()));
+ Throwables.propagate(sqlE);
+ }
+ return splits;
+ }
+
+ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+ Preconditions.checkNotNull(qplan);
+ Preconditions.checkNotNull(splits);
+ final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+ final StatementContext context = qplan.getContext();
+ final TableRef tableRef = qplan.getTableRef();
+ for (KeyRange split : splits) {
+ final Scan splitScan = new Scan(context.getScan());
+ if (tableRef.getTable().getBucketNum() != null) {
+ KeyRange minMaxRange = context.getMinMaxRange();
+ if (minMaxRange != null) {
+ minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange);
+ split = split.intersect(minMaxRange);
+ }
+ }
+ // as the intersect code sets the actual start and stop row within the passed splitScan, we are fetching it back below.
+ if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), context.getScanRanges().useSkipScanFilter())) {
+ final PhoenixInputSplit inputSplit = new PhoenixInputSplit(KeyRange.getKeyRange(splitScan.getStartRow(), splitScan.getStopRow()));
+ psplits.add(inputSplit);
+ }
+ }
+ return psplits;
+ }
+
+ public void setConf(Configuration configuration) {
+ this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
+ }
+
+ public PhoenixPigConfiguration getConf() {
+ return this.phoenixConfiguration;
+ }
+
+ private Connection getConnection() {
+ try {
+ if (this.connection == null) {
+ this.connection = phoenixConfiguration.getConnection();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return connection;
+ }
+
+ /**
+ * Returns the query plan associated with the select query.
+ * @param context
+ * @return
+ * @throws IOException
+ * @throws SQLException
+ */
+ private QueryPlan getQueryPlan(final JobContext context) throws IOException {
+ Preconditions.checkNotNull(context);
+ if(queryPlan == null) {
+ try{
+ final Connection connection = getConnection();
+ final String selectStatement = getConf().getSelectStatement();
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ this.queryPlan = pstmt.compileQuery(selectStatement);
+ } catch(Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+ throw new RuntimeException(exception);
+ }
+ }
+ return queryPlan;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
new file mode 100644
index 0000000..43d69b3
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ *
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+ private KeyRange keyRange;
+
+ /**
+ * No Arg constructor
+ */
+ public PhoenixInputSplit() {
+ }
+
+ /**
+ *
+ * @param keyRange
+ */
+ public PhoenixInputSplit(final KeyRange keyRange) {
+ Preconditions.checkNotNull(keyRange);
+ this.keyRange = keyRange;
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ this.keyRange = new KeyRange ();
+ this.keyRange.readFields(input);
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ Preconditions.checkNotNull(keyRange);
+ keyRange.write(output);
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ /**
+ * @return Returns the keyRange.
+ */
+ public KeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((keyRange == null) ? 0 : keyRange.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) { return true; }
+ if (obj == null) { return false; }
+ if (!(obj instanceof PhoenixInputSplit)) { return false; }
+ PhoenixInputSplit other = (PhoenixInputSplit)obj;
+ if (keyRange == null) {
+ if (other.keyRange != null) { return false; }
+ } else if (!keyRange.equals(other.keyRange)) { return false; }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
index b9d03de..5063ed0 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
@@ -22,31 +22,35 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
-import org.apache.phoenix.pig.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
+import com.google.common.base.Preconditions;
/**
* A {@link Writable} representing a Phoenix record. This class
- * does a type mapping and sets the value accordingly in the
- * {@link PreparedStatement}
- *
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
*
- *
*/
public class PhoenixRecord implements Writable {
private final List<Object> values;
private final ResourceFieldSchema[] fieldSchemas;
+ public PhoenixRecord() {
+ this(null);
+ }
+
public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
this.values = new ArrayList<Object>();
this.fieldSchemas = fieldSchemas;
@@ -63,20 +67,35 @@ public class PhoenixRecord implements Writable {
public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
for (int i = 0; i < columnMetadataList.size(); i++) {
Object o = values.get(i);
-
+ ColumnInfo columnInfo = columnMetadataList.get(i);
byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- Object upsertValue = convertTypeSpecificValue(o, type, columnMetadataList.get(i).getSqlType());
-
- if (upsertValue != null) {
- statement.setObject(i + 1, upsertValue, columnMetadataList.get(i).getSqlType());
- } else {
- statement.setNull(i + 1, columnMetadataList.get(i).getSqlType());
- }
+ try {
+ Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+ if (upsertValue != null) {
+ statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+ } else {
+ statement.setNull(i + 1, columnInfo.getSqlType());
+ }
+ } catch (RuntimeException re) {
+ throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+ ,columnInfo.toString(),re.getMessage()),re);
+
+ }
}
statement.execute();
}
+ public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
+ Preconditions.checkNotNull(rs);
+ Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0");
+ values.clear();
+ for(int i = 1 ; i <= noOfColumns ; i++) {
+ Object obj = rs.getObject(i);
+ values.add(obj);
+ }
+ }
+
public void add(Object value) {
values.add(value);
}
@@ -86,4 +105,8 @@ public class PhoenixRecord implements Writable {
return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
}
+
+ public List<Object> getValues() {
+ return values;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
new file mode 100644
index 0000000..24ad1ee
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.hadoop;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * RecordReader that process the scan and returns PhoenixRecord
+ *
+ */
+public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+ private final PhoenixPigConfiguration phoenixConfiguration;
+ private final QueryPlan queryPlan;
+ private final List<ColumnInfo> columnInfos;
+ private NullWritable key = NullWritable.get();
+ private PhoenixRecord value = null;
+ private ResultIterator resultIterator = null;
+ private PhoenixResultSet resultSet;
+
+ public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
+
+ Preconditions.checkNotNull(pConfiguration);
+ Preconditions.checkNotNull(qPlan);
+ this.phoenixConfiguration = pConfiguration;
+ this.queryPlan = qPlan;
+ this.columnInfos = phoenixConfiguration.getSelectColumnMetadataList();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(resultIterator != null) {
+ try {
+ resultIterator.close();
+ } catch (SQLException e) {
+ LOG.error(" Error closing resultset.");
+ }
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+ final KeyRange keyRange = pSplit.getKeyRange();
+ final Scan splitScan = queryPlan.getContext().getScan();
+ final Scan scan = new Scan(splitScan);
+ scan.setStartRow(keyRange.getLowerRange());
+ scan.setStopRow(keyRange.getUpperRange());
+ try {
+ this.resultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+ Throwables.propagate(e);
+ }
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = NullWritable.get();
+ }
+ if (value == null) {
+ value = new PhoenixRecord();
+ }
+ Preconditions.checkNotNull(this.resultSet);
+ try {
+ if(!resultSet.next()) {
+ return false;
+ }
+ value.read(resultSet,columnInfos.size());
+ return true;
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+ Throwables.propagate(e);
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..3ea9b5b
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ *
+ */
+public final class ColumnInfoToStringEncoderDecoder {
+
+ private static final String COLUMN_INFO_DELIMITER = "|";
+
+ private ColumnInfoToStringEncoderDecoder() {
+
+ }
+
+ public static String encode(List<ColumnInfo> columnInfos) {
+ Preconditions.checkNotNull(columnInfos);
+ return Joiner.on(COLUMN_INFO_DELIMITER).
+ skipNulls().join(columnInfos);
+ }
+
+ public static List<ColumnInfo> decode(final String columnInfoStr) {
+ Preconditions.checkNotNull(columnInfoStr);
+ List<ColumnInfo> columnInfos = Lists.newArrayList(
+ Iterables.transform(
+ Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+ new Function<String, ColumnInfo>() {
+ @Override
+ public ColumnInfo apply(String colInfo) {
+ if (colInfo.isEmpty()) {
+ return null;
+ }
+ return ColumnInfo.fromString(colInfo);
+ }
+ }));
+ return columnInfos;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
new file mode 100644
index 0000000..695b506
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+
+/**
+ *
+ * Utility to generate the ResourceSchema from the list of {@link ColumnInfo}
+ *
+ */
+public final class PhoenixPigSchemaUtil {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
+
+ private PhoenixPigSchemaUtil() {
+ }
+
+ public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+
+ final ResourceSchema schema = new ResourceSchema();
+ try {
+ final List<ColumnInfo> columns = phoenixConfiguration.getSelectColumnMetadataList();
+ ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
+ int i = 0;
+ for(ColumnInfo cinfo : columns) {
+ int sqlType = cinfo.getSqlType();
+ PDataType phoenixDataType = PDataType.fromTypeId(sqlType);
+ byte pigType = TypeUtil.getPigDataTypeForPhoenixType(phoenixDataType);
+ ResourceFieldSchema field = new ResourceFieldSchema();
+ field.setType(pigType).setName(cinfo.getDisplayName());
+ fields[i++] = field;
+ }
+ schema.setFields(fields);
+ } catch(SQLException sqle) {
+ LOG.error(String.format("Error: SQLException [%s] ",sqle.getMessage()));
+ throw new IOException(sqle);
+ }
+
+ return schema;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
new file mode 100644
index 0000000..1b3a90a
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * A function to parse the select query passed to LOAD into a Pair of <table Name, List<columns>
+ *
+ */
+public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
+
+ private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
+ private PhoenixPigConfiguration phoenixConfiguration;
+
+ public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
+ Preconditions.checkNotNull(phoenixConfiguration);
+ this.phoenixConfiguration = phoenixConfiguration;
+ }
+
+ @Override
+ public Pair<String, String> apply(final String selectStatement) {
+ Preconditions.checkNotNull(selectStatement);
+ Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
+ Preconditions.checkNotNull(this.phoenixConfiguration);
+ Connection connection = null;
+ try {
+ connection = this.phoenixConfiguration.getConnection();
+ final Statement statement = connection.createStatement();
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
+ isValidStatement(queryPlan);
+ final String tableName = queryPlan.getTableRef().getTable().getName().getString();
+ final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
+ final List<String> columns = Lists.transform(projectedColumns,
+ new Function<ColumnProjector,String>() {
+ @Override
+ public String apply(ColumnProjector column) {
+ return column.getName();
+ }
+ });
+ final String columnsAsStr = Joiner.on(",").join(columns);
+ return new Pair<String, String>(tableName, columnsAsStr);
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
+ Throwables.propagate(e);
+ } finally {
+ if(connection != null) {
+ try {
+ connection.close();
+ } catch(SQLException sqle) {
+ Throwables.propagate(sqle);
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * The method validates the statement passed to the query plan. List of conditions are
+ * <ol>
+ * <li>Is a SELECT statement</li>
+ * <li>doesn't contain ORDER BY expression</li>
+ * <li>doesn't contain LIMIT</li>
+ * <li>doesn't contain GROUP BY expression</li>
+ * <li>doesn't contain DISTINCT</li>
+ * <li>doesn't contain AGGREGATE functions</li>
+ * </ol>
+ * @param queryPlan
+ * @return
+ */
+ private boolean isValidStatement(final QueryPlan queryPlan) {
+ if(queryPlan.getStatement().getOperation() != PhoenixStatement.Operation.QUERY) {
+ throw new IllegalArgumentException("Query passed isn't a SELECT statement");
+ }
+ if(!queryPlan.getOrderBy().getOrderByExpressions().isEmpty()
+ || queryPlan.getLimit() != null
+ || (queryPlan.getGroupBy() != null && !queryPlan.getGroupBy().isEmpty())
+ || queryPlan.getStatement().isDistinct()
+ || queryPlan.getStatement().isAggregate()) {
+ throw new IllegalArgumentException("SELECT statement shouldn't contain DISTINCT or ORDER BY or LIMIT or GROUP BY expressions");
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
new file mode 100644
index 0000000..5bca30e
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TableSchemaParserFunction.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+
+
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
+/**
+ *
+ * A function to parse the table schema passed to LOAD/STORE into a Pair of <table Name, columns>
+ *
+ */
+public final class TableSchemaParserFunction implements Function<String,Pair<String,String>> {
+
+ private static final char TABLE_COLUMN_DELIMITER = '/';
+
+ @Override
+ public Pair<String, String> apply(final String tableSchema) {
+ Preconditions.checkNotNull(tableSchema);
+ Preconditions.checkArgument(!tableSchema.isEmpty(), "HBase Table name is empty!!");
+
+ final String tokens[] = Iterables.toArray(Splitter.on(TABLE_COLUMN_DELIMITER).
+ trimResults().omitEmptyStrings().split(tableSchema) , String.class);
+ final String tableName = tokens[0];
+ String columns = null;
+ if(tokens.length > 1) {
+ columns = tokens[1];
+ }
+ return new Pair<String, String>(tableName, columns);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
new file mode 100644
index 0000000..f3cacfd
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.pig.util;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.joda.time.DateTime;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
+public final class TypeUtil {
+
+ private static final Log LOG = LogFactory.getLog(TypeUtil.class);
+ private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter();
+ private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();
+
+ private TypeUtil(){
+ }
+
+ /**
+ * A map of Phoenix to Pig data types.
+ * @return
+ */
+ private static ImmutableMap<PDataType, Byte> init() {
+ final ImmutableMap.Builder<PDataType,Byte> builder = new Builder<PDataType,Byte> ();
+ builder.put(PDataType.LONG,DataType.LONG);
+ builder.put(PDataType.VARBINARY,DataType.BYTEARRAY);
+ builder.put(PDataType.CHAR,DataType.CHARARRAY);
+ builder.put(PDataType.VARCHAR,DataType.CHARARRAY);
+ builder.put(PDataType.DOUBLE,DataType.DOUBLE);
+ builder.put(PDataType.FLOAT,DataType.FLOAT);
+ builder.put(PDataType.INTEGER,DataType.INTEGER);
+ builder.put(PDataType.TINYINT,DataType.INTEGER);
+ builder.put(PDataType.SMALLINT,DataType.INTEGER);
+ builder.put(PDataType.DECIMAL,DataType.BIGDECIMAL);
+ builder.put(PDataType.TIME,DataType.DATETIME);
+ builder.put(PDataType.TIMESTAMP,DataType.DATETIME);
+ builder.put(PDataType.BOOLEAN,DataType.BOOLEAN);
+ builder.put(PDataType.DATE,DataType.DATETIME);
+ builder.put(PDataType.UNSIGNED_DATE,DataType.DATETIME);
+ builder.put(PDataType.UNSIGNED_DOUBLE,DataType.DOUBLE);
+ builder.put(PDataType.UNSIGNED_FLOAT,DataType.FLOAT);
+ builder.put(PDataType.UNSIGNED_INT,DataType.INTEGER);
+ builder.put(PDataType.UNSIGNED_LONG,DataType.LONG);
+ builder.put(PDataType.UNSIGNED_SMALLINT,DataType.INTEGER);
+ builder.put(PDataType.UNSIGNED_TIME,DataType.DATETIME);
+ builder.put(PDataType.UNSIGNED_TIMESTAMP,DataType.DATETIME);
+ builder.put(PDataType.UNSIGNED_TINYINT,DataType.INTEGER);
+ return builder.build();
+ }
+ /**
+ * This method returns the most appropriate PDataType associated with
+ * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as
+ * inferredSqlType.
+ *
+ * This is later used to make a cast to targetPhoenixType accordingly. See
+ * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+ *
+ * @param obj
+ * @return PDataType
+ */
+ public static PDataType getType(Object obj, byte type) {
+ if (obj == null) {
+ return null;
+ }
+
+ PDataType sqlType;
+
+ switch (type) {
+ case DataType.BYTEARRAY:
+ sqlType = PDataType.VARBINARY;
+ break;
+ case DataType.CHARARRAY:
+ sqlType = PDataType.VARCHAR;
+ break;
+ case DataType.DOUBLE:
+ sqlType = PDataType.DOUBLE;
+ break;
+ case DataType.FLOAT:
+ sqlType = PDataType.FLOAT;
+ break;
+ case DataType.INTEGER:
+ sqlType = PDataType.INTEGER;
+ break;
+ case DataType.LONG:
+ sqlType = PDataType.LONG;
+ break;
+ case DataType.BOOLEAN:
+ sqlType = PDataType.BOOLEAN;
+ break;
+ case DataType.DATETIME:
+ sqlType = PDataType.DATE;
+ break;
+ default:
+ throw new RuntimeException("Unknown type " + obj.getClass().getName()
+ + " passed to PhoenixHBaseStorage");
+ }
+
+ return sqlType;
+
+ }
+
+ /**
+ * This method encodes a value with Phoenix data type. It begins
+ * with checking whether an object is BINARY and makes a call to
+ * {@link #castBytes(Object, PDataType)} to convery bytes to
+ * targetPhoenixType
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ */
+ public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+ PDataType inferredPType = getType(o, objectType);
+
+ if(inferredPType == null) {
+ return null;
+ }
+
+ if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) {
+ try {
+ o = castBytes(o, targetPhoenixType);
+ inferredPType = getType(o, DataType.findType(o));
+ } catch (IOException e) {
+ throw new RuntimeException("Error while casting bytes for object " +o);
+ }
+ }
+
+ if(inferredPType == PDataType.DATE) {
+ int inferredSqlType = targetPhoenixType.getSqlType();
+
+ if(inferredSqlType == Types.DATE) {
+ return new Date(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIME) {
+ return new Time(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIMESTAMP) {
+ return new Timestamp(((DateTime)o).getMillis());
+ }
+ }
+
+ if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+ return inferredPType.toObject(o, targetPhoenixType);
+ }
+
+ throw new RuntimeException(o.getClass().getName()
+ + " cannot be coerced to "+targetPhoenixType.toString());
+ }
+
+ /**
+ * This method converts bytes to the target type required
+ * for Phoenix. It uses {@link Utf8StorageConverter} for
+ * the conversion.
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ * @throws IOException
+ */
+ public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+ byte[] bytes = ((DataByteArray)o).get();
+
+ switch(targetPhoenixType) {
+ case CHAR:
+ case VARCHAR:
+ return utf8Converter.bytesToCharArray(bytes);
+ case UNSIGNED_SMALLINT:
+ case SMALLINT:
+ return utf8Converter.bytesToInteger(bytes).shortValue();
+ case UNSIGNED_TINYINT:
+ case TINYINT:
+ return utf8Converter.bytesToInteger(bytes).byteValue();
+ case UNSIGNED_INT:
+ case INTEGER:
+ return utf8Converter.bytesToInteger(bytes);
+ case BOOLEAN:
+ return utf8Converter.bytesToBoolean(bytes);
+ case DECIMAL:
+ return utf8Converter.bytesToBigDecimal(bytes);
+ case FLOAT:
+ case UNSIGNED_FLOAT:
+ return utf8Converter.bytesToFloat(bytes);
+ case DOUBLE:
+ case UNSIGNED_DOUBLE:
+ return utf8Converter.bytesToDouble(bytes);
+ case UNSIGNED_LONG:
+ case LONG:
+ return utf8Converter.bytesToLong(bytes);
+ case TIME:
+ case TIMESTAMP:
+ case DATE:
+ case UNSIGNED_TIME:
+ case UNSIGNED_TIMESTAMP:
+ case UNSIGNED_DATE:
+ return utf8Converter.bytesToDateTime(bytes);
+ default:
+ return o;
+ }
+ }
+
+ /**
+ * Transforms the PhoenixRecord to Pig {@link Tuple}.
+ * @param record
+ * @param projectedColumns
+ * @return
+ * @throws IOException
+ */
+ public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+
+ List<Object> columnValues = record.getValues();
+ if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
+ return null;
+ }
+ int columns = columnValues.size();
+ Tuple tuple = TupleFactory.getInstance().newTuple(columns);
+ try {
+ for(int i = 0 ; i < columns ; i++) {
+ final ResourceFieldSchema fieldSchema = projectedColumns[i];
+ Object object = columnValues.get(i);
+ if (object == null) {
+ tuple.set(i, null);
+ continue;
+ }
+
+ switch(fieldSchema.getType()) {
+ case DataType.BYTEARRAY:
+ byte[] bytes = PDataType.fromTypeId(PDataType.BINARY.getSqlType()).toBytes(object);
+ tuple.set(i,new DataByteArray(bytes,0,bytes.length));
+ break;
+ case DataType.CHARARRAY:
+ tuple.set(i,DataType.toString(object));
+ break;
+ case DataType.DOUBLE:
+ tuple.set(i,DataType.toDouble(object));
+ break;
+ case DataType.FLOAT:
+ tuple.set(i,DataType.toFloat(object));
+ break;
+ case DataType.INTEGER:
+ tuple.set(i,DataType.toInteger(object));
+ break;
+ case DataType.LONG:
+ tuple.set(i,DataType.toLong(object));
+ break;
+ case DataType.BOOLEAN:
+ tuple.set(i,DataType.toBoolean(object));
+ break;
+ case DataType.DATETIME:
+ tuple.set(i,DataType.toDateTime(object));
+ break;
+ default:
+ throw new RuntimeException(String.format(" Not supported [%s] pig type" , fieldSchema));
+ }
+ }
+ } catch( Exception ex) {
+ final String errorMsg = String.format(" Error transforming PhoenixRecord to Tuple [%s] ", ex.getMessage());
+ LOG.error(errorMsg);
+ throw new PigException(errorMsg);
+ }
+ return tuple;
+ }
+
+ /**
+ * Returns the mapping pig data type for a given phoenix data type.
+ * @param phoenixDataType
+ * @return
+ */
+ public static Byte getPigDataTypeForPhoenixType(final PDataType phoenixDataType) {
+ Preconditions.checkNotNull(phoenixDataType);
+ final Byte pigDataType = phoenixTypeToPigDataType.get(phoenixDataType);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(String.format(" For PhoenixDataType [%s] , pigDataType is [%s] " , phoenixDataType.getSqlTypeName() , DataType.findTypeName(pigDataType)));
+ }
+ return pigDataType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
deleted file mode 100644
index 1360774..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixHBaseStorageTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Collection;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.pig.PhoenixHBaseStorage;
-import org.apache.phoenix.util.ConfigUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.pig.ExecType;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.builtin.mock.Storage;
-import org.apache.pig.builtin.mock.Storage.Data;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests for {@link PhoenixHBaseStorage}
- *
- */
-public class PhoenixHBaseStorageTest {
- private static TupleFactory tupleFactory;
- private static HBaseTestingUtility hbaseTestUtil;
- private static String zkQuorum;
- private static Connection conn;
- private static PigServer pigServer;
-
- @BeforeClass
- public static void setUp() throws Exception {
- hbaseTestUtil = new HBaseTestingUtility();
- Configuration conf = hbaseTestUtil.getConfiguration();
- ConfigUtil.setReplicationConfigIfAbsent(conf);
- hbaseTestUtil.startMiniCluster();
-
- Class.forName(PhoenixDriver.class.getName());
- zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
- conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
- + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
-
- // Pig variables
- pigServer = new PigServer(ExecType.LOCAL);
- tupleFactory = TupleFactory.getInstance();
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- conn.close();
- PhoenixDriver.INSTANCE.close();
- hbaseTestUtil.shutdownMiniCluster();
- pigServer.shutdown();
- }
-
- /**
- * Basic test - writes data to a Phoenix table and compares the data written
- * to expected
- *
- * @throws Exception
- */
- @Test
- public void testStorer() throws Exception {
- final String tableName = "TABLE1";
- final Statement stmt = conn.createStatement();
-
- stmt.execute("CREATE TABLE " + tableName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR)");
-
- final Data data = Storage.resetData(pigServer);
- final Collection<Tuple> list = Lists.newArrayList();
-
- // Create input dataset
- int rows = 100;
- for (int i = 0; i < rows; i++) {
- Tuple t = tupleFactory.newTuple();
- t.append(i);
- t.append("a" + i);
- list.add(t);
- }
- data.set("in", "id:int, name:chararray", list);
-
- pigServer.setBatchOn();
- pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
-
- pigServer.registerQuery("Store A into 'hbase://" + tableName
- + "' using " + PhoenixHBaseStorage.class.getName() + "('"
- + zkQuorum + "', '-batchSize 1000');");
-
- // Now run the Pig script
- if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
- throw new RuntimeException("Job failed", pigServer.executeBatch()
- .get(0).getException());
- }
-
- // Compare data in Phoenix table to the expected
- final ResultSet rs = stmt
- .executeQuery("SELECT id, name FROM table1 ORDER BY id");
-
- for (int i = 0; i < rows; i++) {
- assertTrue(rs.next());
- assertEquals(i, rs.getInt(1));
- assertEquals("a" + i, rs.getString(2));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
new file mode 100644
index 0000000..0337563
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.SQLException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+
+/**
+ * Tests for PhoenixPigConfiguration.
+ *
+ */
+public class PhoenixPigConfigurationTest {
+
+
+ @Test
+ public void testBasicConfiguration() throws SQLException {
+ Configuration conf = new Configuration();
+ final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf);
+ final String zkQuorum = "localhost";
+ final String tableName = "TABLE";
+ final long batchSize = 100;
+ phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
+ assertEquals(zkQuorum,phoenixConfiguration.getServer());
+ assertEquals(tableName,phoenixConfiguration.getTableName());
+ assertEquals(batchSize,phoenixConfiguration.getBatchSize());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..9777bb5
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+ @Test
+ public void testEncode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testDecode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testEncodeDecodeWithNulls() {
+ final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final ColumnInfo columnInfo2 = null;
+ final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+ final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ assertEquals(1,decodedColumnInfo.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
new file mode 100644
index 0000000..310128c
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ *
+ * Tests on PhoenixPigSchemaUtil
+ */
+public class PhoenixPigSchemaUtilTest {
+
+ private static final ColumnInfo ID_COLUMN = new ColumnInfo("ID", Types.BIGINT);
+ private static final ColumnInfo NAME_COLUMN = new ColumnInfo("NAME", Types.VARCHAR);
+ private static final ColumnInfo LOCATION_COLUMN = new ColumnInfo("LOCATION", Types.ARRAY);
+
+
+ @Test
+ public void testSchema() throws SQLException, IOException {
+
+ final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN);
+ when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration);
+
+ // expected schema.
+ final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
+ fields[0] = new ResourceFieldSchema().setName("ID")
+ .setType(DataType.LONG);
+
+ fields[1] = new ResourceFieldSchema().setName("NAME")
+ .setType(DataType.CHARARRAY);
+ final ResourceSchema expected = new ResourceSchema().setFields(fields);
+
+ assertEquals(expected.toString(), actual.toString());
+
+ }
+
+ @Test(expected=IllegalDataException.class)
+ public void testUnSupportedTypes() throws SQLException, IOException {
+
+ final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN);
+ when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ PhoenixPigSchemaUtil.getResourceSchema(configuration);
+ fail("We currently don't support Array type yet. WIP!!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/a9b8eb9b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
new file mode 100644
index 0000000..3daf4e1
--- /dev/null
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Joiner;
+
+/**
+ *
+ * Unit tests to validate the query passed to LOAD .
+ *
+ */
+public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
+
+ private PhoenixPigConfiguration phoenixConfiguration;
+ private Connection conn;
+ private QuerySchemaParserFunction function;
+
+ @Before
+ public void setUp() throws SQLException {
+ phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
+ conn = DriverManager.getConnection(getUrl());
+ Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
+ function = new QuerySchemaParserFunction(phoenixConfiguration);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testSelectQuery() {
+ final String selectQuery = "SELECT col1 FROM test";
+ function.apply(selectQuery);
+ fail("Should fail as the table [test] doesn't exist");
+ }
+
+ @Test
+ public void testValidSelectQuery() throws SQLException {
+ String ddl = "CREATE TABLE EMPLOYEE " +
+ " (id integer not null, name varchar, age integer,location varchar " +
+ " CONSTRAINT pk PRIMARY KEY (id))\n";
+ createTestTable(getUrl(), ddl);
+
+ final String selectQuery = "SELECT name,age,location FROM EMPLOYEE";
+ Pair<String,String> pair = function.apply(selectQuery);
+
+ assertEquals(pair.getFirst(), "EMPLOYEE");
+ assertEquals(pair.getSecond(),Joiner.on(',').join("NAME","AGE","LOCATION"));
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testUpsertQuery() throws SQLException {
+ String ddl = "CREATE TABLE EMPLOYEE " +
+ " (id integer not null, name varchar, age integer,location varchar " +
+ " CONSTRAINT pk PRIMARY KEY (id))\n";
+ createTestTable(getUrl(), ddl);
+
+ final String upsertQuery = "UPSERT INTO EMPLOYEE (ID, NAME) VALUES (?, ?)";
+
+ function.apply(upsertQuery);
+ fail(" Function call successful despite passing an UPSERT query");
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testAggregationQuery() throws SQLException {
+ String ddl = "CREATE TABLE EMPLOYEE " +
+ " (id integer not null, name varchar, age integer,location varchar " +
+ " CONSTRAINT pk PRIMARY KEY (id))\n";
+ createTestTable(getUrl(), ddl);
+
+ final String selectQuery = "SELECT MAX(ID) FROM EMPLOYEE";
+ function.apply(selectQuery);
+ fail(" Function call successful despite passing an aggreagate query");
+ }
+
+ @After
+ public void tearDown() throws SQLException {
+ conn.close();
+ }
+}