You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/10 01:51:48 UTC
[2/3] phoenix git commit: PHOENIX-1454 Map-Reduce-over-Phoenix-Tables
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 5f6fcdd..5abc9c8 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.pig;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
@@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
@@ -71,74 +77,76 @@ import org.apache.pig.impl.util.UDFContext;
@SuppressWarnings("rawtypes")
public class PhoenixHBaseStorage implements StoreFuncInterface {
- private PhoenixPigConfiguration config;
- private RecordWriter<NullWritable, PhoenixRecord> writer;
- private String contextSignature = null;
- private ResourceSchema schema;
- private long batchSize;
- private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
-
- // Set of options permitted
- private final static Options validOptions = new Options();
- private final static CommandLineParser parser = new GnuParser();
- private final static String SCHEMA = "_schema";
-
- private final CommandLine configuredOptions;
- private final String server;
-
- public PhoenixHBaseStorage(String server) throws ParseException {
- this(server, null);
- }
-
- public PhoenixHBaseStorage(String server, String optString)
- throws ParseException {
- populateValidOptions();
- this.server = server;
-
- String[] optsArr = optString == null ? new String[0] : optString.split(" ");
- try {
- configuredOptions = parser.parse(validOptions, optsArr);
- } catch (ParseException e) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("[-batchSize]", validOptions);
- throw e;
- }
-
- batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
- }
-
- private static void populateValidOptions() {
- validOptions.addOption("batchSize", true, "Specify upsert batch size");
- }
-
- /**
- * Returns UDFProperties based on <code>contextSignature</code>.
- */
- private Properties getUDFProperties() {
- return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
- }
-
-
- /**
- * Parse the HBase table name and configure job
- */
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- URI locationURI;
+ private Configuration config;
+ private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+ private List<ColumnInfo> columnInfo = null;
+ private String contextSignature = null;
+ private ResourceSchema schema;
+ private long batchSize;
+ private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+ // Set of options permitted
+ private final static Options validOptions = new Options();
+ private final static CommandLineParser parser = new GnuParser();
+ private final static String SCHEMA = "_schema";
+
+ private final CommandLine configuredOptions;
+ private final String server;
+
+ public PhoenixHBaseStorage(String server) throws ParseException {
+ this(server, null);
+ }
+
+ public PhoenixHBaseStorage(String server, String optString)
+ throws ParseException {
+ populateValidOptions();
+ this.server = server;
+
+ String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+ try {
+ configuredOptions = parser.parse(validOptions, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("[-batchSize]", validOptions);
+ throw e;
+ }
+ batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+ }
+
+ private static void populateValidOptions() {
+ validOptions.addOption("batchSize", true, "Specify upsert batch size");
+ }
+
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+ }
+
+
+ /**
+ * Parse the HBase table name and configure job
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ URI locationURI;
try {
locationURI = new URI(location);
if (!"hbase".equals(locationURI.getScheme())) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
}
+ config = job.getConfiguration();
+ config.set(HConstants.ZOOKEEPER_QUORUM, server);
String tableName = locationURI.getAuthority();
// strip off the leading path token '/'
String columns = null;
if(!locationURI.getPath().isEmpty()) {
columns = locationURI.getPath().substring(1);
+ PhoenixConfigurationUtil.setUpsertColumnNames(config, columns);
}
- config = new PhoenixPigConfiguration(job.getConfiguration());
- config.configure(server, tableName, batchSize, columns);
-
+ PhoenixConfigurationUtil.setOutputTableName(config,tableName);
+ PhoenixConfigurationUtil.setBatchSize(config,batchSize);
String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
if (serializedSchema != null) {
schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
@@ -146,59 +154,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
} catch (URISyntaxException e) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
}
- }
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
@Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
- this.writer =writer;
- }
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = writer;
+ try {
+ this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config);
+ } catch(SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
- @Override
- public void putNext(Tuple t) throws IOException {
+ @Override
+ public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
-
- PhoenixRecord record = new PhoenixRecord(fieldSchemas);
-
+ PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
for(int i=0; i<t.size(); i++) {
- record.add(t.get(i));
+ record.add(t.get(i));
+ }
+ try {
+ this.writer.write(null, record);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- try {
- writer.write(null, record);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- }
+ }
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
- }
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- }
-
- @Override
- public void cleanupOnSuccess(String location, Job job) throws IOException {
- }
-
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
- return location;
- }
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- return outputFormat;
- }
-
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- schema = s;
- getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
- }
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return outputFormat;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
deleted file mode 100644
index c6b6ec9..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
-
-/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
- *
- */
-public class PhoenixPigConfiguration {
-
- private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
-
- private PhoenixPigConfigurationUtil util;
-
- /**
- * Speculative execution of Map tasks
- */
- public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
-
- /**
- * Speculative execution of Reduce tasks
- */
- public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
-
- public static final String SERVER_NAME = "phoenix.hbase.server.name";
-
- public static final String TABLE_NAME = "phoenix.hbase.table.name";
-
- public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
-
- public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
-
- public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
-
- public static final String SELECT_STATEMENT = "phoenix.select.stmt";
-
- public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
-
- //columns projected given as part of LOAD.
- public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
-
- public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
-
- public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
-
- // the delimiter supported during LOAD and STORE when projected columns are given.
- public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
-
- public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
-
- public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
-
- private final Configuration conf;
-
- public PhoenixPigConfiguration(Configuration conf) {
- this.conf = conf;
- this.util = new PhoenixPigConfigurationUtil();
- }
-
- public void configure(String server, String tableName, long batchSize) {
- configure(server,tableName,batchSize,null);
- }
-
- public void configure(String server, String tableName, long batchSize, String columns) {
- conf.set(SERVER_NAME, server);
- conf.set(TABLE_NAME, tableName);
- conf.setLong(UPSERT_BATCH_SIZE, batchSize);
- if (isNotEmpty(columns)) {
- conf.set(UPSERT_COLUMNS, columns);
- }
- conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
- conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
- }
-
-
- /**
- * Creates a {@link Connection} with autoCommit set to false.
- * @throws SQLException
- */
- public Connection getConnection() throws SQLException {
- return getUtil().getConnection(getConfiguration());
- }
-
- public String getUpsertStatement() throws SQLException {
- return getUtil().getUpsertStatement(getConfiguration(), getTableName());
- }
-
- public long getBatchSize() throws SQLException {
- return getUtil().getBatchSize(getConfiguration());
- }
-
- public String getServer() {
- return conf.get(SERVER_NAME);
- }
-
- public List<ColumnInfo> getColumnMetadataList() throws SQLException {
- return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public String getUpsertColumns() {
- return conf.get(UPSERT_COLUMNS);
- }
-
- public String getTableName() {
- return conf.get(TABLE_NAME);
- }
-
- public Configuration getConfiguration() {
- return this.conf;
- }
-
- public String getSelectStatement() throws SQLException {
- return getUtil().getSelectStatement(getConfiguration(), getTableName());
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
- return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public int getSelectColumnsCount() throws SQLException {
- return getUtil().getSelectColumnsCount(getConfiguration(), getTableName());
- }
-
- public SchemaType getSchemaType() {
- final String schemaTp = conf.get(SCHEMA_TYPE);
- return SchemaType.valueOf(schemaTp);
- }
-
-
- public void setServerName(final String zookeeperQuorum) {
- this.conf.set(SERVER_NAME, zookeeperQuorum);
- }
-
- public void setTableName(final String tableName) {
- Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
- this.conf.set(TABLE_NAME, tableName);
- }
-
- public void setSelectStatement(final String selectStatement) {
- this.conf.set(SELECT_STATEMENT, selectStatement);
- }
-
- public void setSelectColumns(String selectColumns) {
- this.conf.set(SELECT_COLUMNS, selectColumns);
- }
-
- public PhoenixPigConfigurationUtil getUtil() {
- return this.util;
- }
-
- public void setSchemaType(final SchemaType schemaType) {
- this.conf.set(SCHEMA_TYPE, schemaType.name());
- }
-
- public enum SchemaType {
- TABLE,
- QUERY;
- }
-
-
- @VisibleForTesting
- static class PhoenixPigConfigurationUtil {
-
- public Connection getConnection(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Properties props = new Properties();
- final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
- conn.setAutoCommit(false);
- return conn;
- }
-
- public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- String upsertColumns = configuration.get(UPSERT_COLUMNS);
- List<String> upsertColumnList = null;
- if(isNotEmpty(upsertColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
- LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
- ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
- ));
- }
- List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String upsertStmt = configuration.get(UPSERT_STATEMENT);
- if(isNotEmpty(upsertStmt)) {
- return upsertStmt;
- }
- final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
- final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
- if (useUpsertColumns) {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
- LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
- } else {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
- LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
- }
- configuration.set(UPSERT_STATEMENT, upsertStmt);
- return upsertStmt;
-
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- final List<String> selectColumnList = getSelectColumnList(configuration);
- final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- private List<String> getSelectColumnList(
- final Configuration configuration) {
- String selectColumns = configuration.get(SELECT_COLUMNS);
- List<String> selectColumnList = null;
- if(isNotEmpty(selectColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
- LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
- ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
- ));
- }
- return selectColumnList;
- }
-
- public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String selectStmt = configuration.get(SELECT_STATEMENT);
- if(isNotEmpty(selectStmt)) {
- return selectStmt;
- }
- final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
- selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
- LOG.info("Select Statement: "+ selectStmt);
- configuration.set(SELECT_STATEMENT, selectStmt);
- return selectStmt;
- }
-
- public long getBatchSize(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
- if(batchSize <= 0) {
- Connection conn = getConnection(configuration);
- batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
- closeConnection(conn);
- }
- configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
- return batchSize;
- }
-
- public int getSelectColumnsCount(Configuration configuration,
- String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- final String schemaTp = configuration.get(SCHEMA_TYPE);
- final SchemaType schemaType = SchemaType.valueOf(schemaTp);
- int count = 0;
- if(SchemaType.QUERY.equals(schemaType)) {
- List<String> selectedColumnList = getSelectColumnList(configuration);
- count = selectedColumnList == null ? 0 : selectedColumnList.size();
- } else {
- List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName);
- count = columnInfos == null ? 0 : columnInfos.size();
- }
- return count;
- }
-
- private void closeConnection(final Connection connection) throws SQLException {
- if(connection != null) {
- connection.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
deleted file mode 100644
index 2ef7914..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * The InputFormat class for generating the splits and creating the record readers.
- *
- */
-public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
- private PhoenixPigConfiguration phoenixConfiguration;
- private Connection connection;
- private QueryPlan queryPlan;
-
- /**
- * instantiated by framework
- */
- public PhoenixInputFormat() {
- }
-
- @Override
- public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- try {
- return new PhoenixRecordReader(phoenixConfiguration,queryPlan);
- }catch(SQLException sqle) {
- throw new IOException(sqle);
- }
- }
-
-
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
- return splits;
- }
-
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
- Preconditions.checkNotNull(qplan);
- Preconditions.checkNotNull(splits);
- final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
- for (List<Scan> scans : qplan.getScans()) {
- psplits.add(new PhoenixInputSplit(scans));
- }
- return psplits;
- }
-
- public void setConf(Configuration configuration) {
- this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- }
-
- public PhoenixPigConfiguration getConf() {
- return this.phoenixConfiguration;
- }
-
- private Connection getConnection() {
- try {
- if (this.connection == null) {
- this.connection = phoenixConfiguration.getConnection();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return connection;
- }
-
- /**
- * Returns the query plan associated with the select query.
- * @param context
- * @return
- * @throws IOException
- * @throws SQLException
- */
- private QueryPlan getQueryPlan(final JobContext context) throws IOException {
- Preconditions.checkNotNull(context);
- if(queryPlan == null) {
- try{
- final Connection connection = getConnection();
- final String selectStatement = getConf().getSelectStatement();
- Preconditions.checkNotNull(selectStatement);
- final Statement statement = connection.createStatement();
- final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- // Optimize the query plan so that we potentially use secondary indexes
- this.queryPlan = pstmt.optimizeQuery(selectStatement);
- // Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator();
- } catch(Exception exception) {
- LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
- throw new RuntimeException(exception);
- }
- }
- return queryPlan;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
deleted file mode 100644
index 7414d67..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- *
- * Input split class to hold the lower and upper bound range. {@link KeyRange}
- *
- */
-public class PhoenixInputSplit extends InputSplit implements Writable {
-
- private List<Scan> scans;
- private KeyRange keyRange;
-
- /**
- * No Arg constructor
- */
- public PhoenixInputSplit() {
- }
-
- /**
- *
- * @param keyRange
- */
- public PhoenixInputSplit(final List<Scan> scans) {
- Preconditions.checkNotNull(scans);
- Preconditions.checkState(!scans.isEmpty());
- this.scans = scans;
- init();
- }
-
- public List<Scan> getScans() {
- return scans;
- }
-
- public KeyRange getKeyRange() {
- return keyRange;
- }
-
- private void init() {
- this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int count = WritableUtils.readVInt(input);
- scans = Lists.newArrayListWithExpectedSize(count);
- for (int i = 0; i < count; i++) {
- Scan scan = new Scan();
- scan.readFields(input);
- scans.add(scan);
- }
- init();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- Preconditions.checkNotNull(scans);
- WritableUtils.writeVInt(output, scans.size());
- for (Scan scan : scans) {
- scan.write(output);
- }
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + keyRange.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- // TODO: review: it's a reasonable check to use the keyRange,
- // but it's not perfect. Do we need an equals impl?
- if (this == obj) { return true; }
- if (obj == null) { return false; }
- if (!(obj instanceof PhoenixInputSplit)) { return false; }
- PhoenixInputSplit other = (PhoenixInputSplit)obj;
- if (keyRange == null) {
- if (other.keyRange != null) { return false; }
- } else if (!keyRange.equals(other.keyRange)) { return false; }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
deleted file mode 100644
index a8d9d8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.jdbc.PhoenixStatement;
-
-/**
- *
- * {@link OutputCommitter} implementation for Pig tasks using Phoenix
- * connections to upsert to HBase
- *
- *
- *
- */
-public class PhoenixOutputCommitter extends OutputCommitter {
- private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
-
- private final PhoenixOutputFormat outputFormat;
-
- public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
- if(outputFormat == null) {
- throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
- }
- this.outputFormat = outputFormat;
- }
-
- /**
- * TODO implement rollback functionality.
- *
- * {@link PhoenixStatement#execute(String)} is buffered on the client, this makes
- * it difficult to implement rollback as once a commit is issued it's hard to go
- * back all the way to undo.
- */
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- }
-
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- commit(outputFormat.getConnection(context.getConfiguration()));
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return true;
- }
-
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
- }
-
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- }
-
- /**
- * Commit a transaction on task completion
- *
- * @param connection
- * @throws IOException
- */
- private void commit(Connection connection) throws IOException {
- try {
- if (connection == null || connection.isClosed()) {
- throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
- }
- } catch (SQLException e) {
- throw new IOException("Exception calling isClosed on connection", e);
- }
-
- try {
- LOG.debug("Commit called on task completion");
- connection.commit();
- } catch (SQLException e) {
- throw new IOException("Exception while trying to commit a connection. ", e);
- } finally {
- try {
- LOG.debug("Closing connection to database on task completion");
- connection.close();
- } catch (SQLException e) {
- LOG.warn("Exception while trying to close database connection", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
deleted file mode 100644
index 9c29f8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * {@link OutputFormat} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
- private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
-
- private Connection connection;
- private PhoenixPigConfiguration config;
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
- }
-
- /**
- * TODO Implement {@link OutputCommitter} to rollback in case of task failure
- */
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new PhoenixOutputCommitter(this);
- }
-
- @Override
- public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- try {
- return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * This method creates a database connection. A single instance is created
- * and passed around for re-use.
- *
- * @param configuration
- * @return
- * @throws IOException
- */
- synchronized Connection getConnection(Configuration configuration) throws IOException {
- if (connection != null) {
- return connection;
- }
-
- config = new PhoenixPigConfiguration(configuration);
- try {
- LOG.info("Initializing new Phoenix connection...");
- connection = config.getConnection();
- LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
- return connection;
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
deleted file mode 100644
index c18ef85..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- *
- */
-public class PhoenixRecord implements Writable {
-
- private final List<Object> values;
- private final ResourceFieldSchema[] fieldSchemas;
-
- public PhoenixRecord() {
- this(null);
- }
-
- public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
- this.values = new ArrayList<Object>();
- this.fieldSchemas = fieldSchemas;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- }
-
- public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
- for (int i = 0; i < columnMetadataList.size(); i++) {
- Object o = values.get(i);
- ColumnInfo columnInfo = columnMetadataList.get(i);
- byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- try {
- Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
- if (upsertValue != null) {
- statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
- } else {
- statement.setNull(i + 1, columnInfo.getSqlType());
- }
- } catch (RuntimeException re) {
- throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
- ,columnInfo.toString(),re.getMessage()),re);
-
- }
- }
-
- statement.execute();
- }
-
- public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
- Preconditions.checkNotNull(rs);
- Preconditions.checkArgument(noOfColumns > 0, "Number of arguments passed is <= 0");
- values.clear();
- for(int i = 1 ; i <= noOfColumns ; i++) {
- Object obj = rs.getObject(i);
- values.add(obj);
- }
- }
-
- public void add(Object value) {
- values.add(value);
- }
-
- private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
- PDataType pDataType = PDataType.fromTypeId(sqlType);
-
- return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
- }
-
- public List<Object> getValues() {
- return values;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
deleted file mode 100644
index f6808a8..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * RecordReader that process the scan and returns PhoenixRecord
- *
- */
-public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
- private final QueryPlan queryPlan;
- private final int columnsCount;
- private NullWritable key = NullWritable.get();
- private PhoenixRecord value = null;
- private ResultIterator resultIterator = null;
- private PhoenixResultSet resultSet;
-
- public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
-
- Preconditions.checkNotNull(pConfiguration);
- Preconditions.checkNotNull(qPlan);
- this.phoenixConfiguration = pConfiguration;
- this.queryPlan = qPlan;
- this.columnsCount = phoenixConfiguration.getSelectColumnsCount();
- }
-
- @Override
- public void close() throws IOException {
- if(resultIterator != null) {
- try {
- resultIterator.close();
- } catch (SQLException e) {
- LOG.error(" Error closing resultset.");
- }
- }
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
- final List<Scan> scans = pSplit.getScans();
- try {
- List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
- for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
- PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
- iterators.add(peekingResultIterator);
- }
- ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
- if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
- iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
- }
- this.resultIterator = iterator;
- this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
- Throwables.propagate(e);
- }
-
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (key == null) {
- key = NullWritable.get();
- }
- if (value == null) {
- value = new PhoenixRecord();
- }
- Preconditions.checkNotNull(this.resultSet);
- try {
- if(!resultSet.next()) {
- return false;
- }
- value.read(resultSet,columnsCount);
- return true;
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
- Throwables.propagate(e);
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
deleted file mode 100644
index c980a38..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- *
- * {@link RecordWriter} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
-
- private long numRecords = 0;
-
- private final Connection conn;
- private final PreparedStatement statement;
- private final PhoenixPigConfiguration config;
- private final long batchSize;
-
- public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
- this.conn = conn;
- this.config = config;
- this.batchSize = config.getBatchSize();
- this.statement = this.conn.prepareStatement(config.getUpsertStatement());
- }
-
-
- /**
- * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
- *
- */
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- }
-
- @Override
- public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {
- try {
- record.write(statement, config.getColumnMetadataList());
- numRecords++;
-
- if (numRecords % batchSize == 0) {
- LOG.debug("commit called on a batch of size : " + batchSize);
- conn.commit();
- }
- } catch (SQLException e) {
- throw new IOException("Exception while committing to database.", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
deleted file mode 100644
index 3ea9b5b..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import java.util.List;
-
-import org.apache.phoenix.util.ColumnInfo;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- *
- * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
- *
- */
-public final class ColumnInfoToStringEncoderDecoder {
-
- private static final String COLUMN_INFO_DELIMITER = "|";
-
- private ColumnInfoToStringEncoderDecoder() {
-
- }
-
- public static String encode(List<ColumnInfo> columnInfos) {
- Preconditions.checkNotNull(columnInfos);
- return Joiner.on(COLUMN_INFO_DELIMITER).
- skipNulls().join(columnInfos);
- }
-
- public static List<ColumnInfo> decode(final String columnInfoStr) {
- Preconditions.checkNotNull(columnInfoStr);
- List<ColumnInfo> columnInfos = Lists.newArrayList(
- Iterables.transform(
- Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
- new Function<String, ColumnInfo>() {
- @Override
- public ColumnInfo apply(String colInfo) {
- if (colInfo.isEmpty()) {
- return null;
- }
- return ColumnInfo.fromString(colInfo);
- }
- }));
- return columnInfos;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index 9f8a5e4..dfba844 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -25,8 +25,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
@@ -41,24 +42,25 @@ import com.google.common.base.Preconditions;
*/
public final class PhoenixPigSchemaUtil {
- private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
+private static final Log LOG = LogFactory.getLog(PhoenixPigSchemaUtil.class);
private PhoenixPigSchemaUtil() {
}
- public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+ public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
final ResourceSchema schema = new ResourceSchema();
try {
- List<ColumnInfo> columns = null;
- if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) {
- final String sqlQuery = phoenixConfiguration.getSelectStatement();
- Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
- final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
- columns = function.apply(sqlQuery);
- } else {
- columns = phoenixConfiguration.getSelectColumnMetadataList();
- }
+ List<ColumnInfo> columns = null;
+ final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
+ if(SchemaType.QUERY.equals(schemaType)) {
+ final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
+ final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
+ columns = function.apply(sqlQuery);
+ } else {
+ columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
+ }
ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
int i = 0;
for(ColumnInfo cinfo : columns) {
@@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil {
}
return schema;
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index 1b3a90a..f0148a6 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -26,16 +26,16 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
/**
@@ -46,21 +46,20 @@ import com.google.common.collect.Lists;
public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
- private PhoenixPigConfiguration phoenixConfiguration;
+ private final Configuration configuration;
- public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
- Preconditions.checkNotNull(phoenixConfiguration);
- this.phoenixConfiguration = phoenixConfiguration;
+ public QuerySchemaParserFunction(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ this.configuration = configuration;
}
@Override
public Pair<String, String> apply(final String selectStatement) {
Preconditions.checkNotNull(selectStatement);
Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
- Preconditions.checkNotNull(this.phoenixConfiguration);
Connection connection = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
@@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
return new Pair<String, String>(tableName, columnsAsStr);
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error(" Error closing connection ");
+ throw new RuntimeException(sqle);
}
}
}
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 52f646c..78bf3a7 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -26,60 +26,59 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.util.ColumnInfo;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> {
- private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
+ private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
+ private final Configuration configuration;
- public SqlQueryToColumnInfoFunction(
- final PhoenixPigConfiguration phoenixPigConfiguration) {
- super();
- this.phoenixConfiguration = phoenixPigConfiguration;
- }
+ public SqlQueryToColumnInfoFunction(final Configuration configuration) {
+ this.configuration = configuration;
+ }
- @Override
- public List<ColumnInfo> apply(String sqlQuery) {
- Preconditions.checkNotNull(sqlQuery);
- Connection connection = null;
- List<ColumnInfo> columnInfos = null;
+ @Override
+ public List<ColumnInfo> apply(String sqlQuery) {
+ Preconditions.checkNotNull(sqlQuery);
+ Connection connection = null;
+ List<ColumnInfo> columnInfos = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
- @Override
- public ColumnInfo apply(final ColumnProjector columnProjector) {
- return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
- }
-
+ @Override
+ public ColumnInfo apply(final ColumnProjector columnProjector) {
+ return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
+ }
+
});
- } catch (SQLException e) {
+ } catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error("Error closing connection!!");
+ throw new RuntimeException(sqle);
}
}
}
- return columnInfos;
- }
+ return columnInfos;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 1cdd66d..20b56b4 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
import org.apache.phoenix.schema.PDataType;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -47,16 +47,16 @@ public final class TypeUtil {
private static final Log LOG = LogFactory.getLog(TypeUtil.class);
private static final HBaseBinaryConverter binaryConverter = new HBaseBinaryConverter ();
- private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();
-
- private TypeUtil(){
- }
-
- /**
- * A map of Phoenix to Pig data types.
- * @return
- */
- private static ImmutableMap<PDataType, Byte> init() {
+ private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init();
+
+ private TypeUtil(){
+ }
+
+ /**
+ * A map of Phoenix to Pig data types.
+ * @return
+ */
+ private static ImmutableMap<PDataType, Byte> init() {
final ImmutableMap.Builder<PDataType,Byte> builder = new Builder<PDataType,Byte> ();
builder.put(PDataType.LONG,DataType.LONG);
builder.put(PDataType.VARBINARY,DataType.BYTEARRAY);
@@ -84,121 +84,121 @@ public final class TypeUtil {
return builder.build();
}
/**
- * This method returns the most appropriate PDataType associated with
- * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as
- * inferredSqlType.
- *
- * This is later used to make a cast to targetPhoenixType accordingly. See
- * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
- *
- * @param obj
- * @return PDataType
- */
- public static PDataType getType(Object obj, byte type) {
- if (obj == null) {
- return null;
- }
- PDataType sqlType;
+ * This method returns the most appropriate PDataType associated with
+ * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as
+ * inferredSqlType.
+ *
+ * This is later used to make a cast to targetPhoenixType accordingly. See
+ * {@link #castPigTypeToPhoenix(Object, byte, PDataType)}
+ *
+ * @param obj
+ * @return PDataType
+ */
+ public static PDataType getType(Object obj, byte type) {
+ if (obj == null) {
+ return null;
+ }
+ PDataType sqlType;
- switch (type) {
- case DataType.BYTEARRAY:
- sqlType = PDataType.VARBINARY;
- break;
- case DataType.CHARARRAY:
- sqlType = PDataType.VARCHAR;
- break;
- case DataType.DOUBLE:
- case DataType.BIGDECIMAL:
- sqlType = PDataType.DOUBLE;
- break;
- case DataType.FLOAT:
- sqlType = PDataType.FLOAT;
- break;
- case DataType.INTEGER:
- sqlType = PDataType.INTEGER;
- break;
- case DataType.LONG:
- case DataType.BIGINTEGER:
- sqlType = PDataType.LONG;
- break;
- case DataType.BOOLEAN:
- sqlType = PDataType.BOOLEAN;
- break;
- case DataType.DATETIME:
- sqlType = PDataType.DATE;
- break;
- case DataType.BYTE:
- sqlType = PDataType.TINYINT;
- break;
- default:
- throw new RuntimeException("Unknown type " + obj.getClass().getName()
- + " passed to PhoenixHBaseStorage");
- }
+ switch (type) {
+ case DataType.BYTEARRAY:
+ sqlType = PDataType.VARBINARY;
+ break;
+ case DataType.CHARARRAY:
+ sqlType = PDataType.VARCHAR;
+ break;
+ case DataType.DOUBLE:
+ case DataType.BIGDECIMAL:
+ sqlType = PDataType.DOUBLE;
+ break;
+ case DataType.FLOAT:
+ sqlType = PDataType.FLOAT;
+ break;
+ case DataType.INTEGER:
+ sqlType = PDataType.INTEGER;
+ break;
+ case DataType.LONG:
+ case DataType.BIGINTEGER:
+ sqlType = PDataType.LONG;
+ break;
+ case DataType.BOOLEAN:
+ sqlType = PDataType.BOOLEAN;
+ break;
+ case DataType.DATETIME:
+ sqlType = PDataType.DATE;
+ break;
+ case DataType.BYTE:
+ sqlType = PDataType.TINYINT;
+ break;
+ default:
+ throw new RuntimeException("Unknown type " + obj.getClass().getName()
+ + " passed to PhoenixHBaseStorage");
+ }
- return sqlType;
+ return sqlType;
- }
+ }
- /**
- * This method encodes a value with Phoenix data type. It begins
- * with checking whether an object is BINARY and makes a call to
- * {@link #castBytes(Object, PDataType)} to convery bytes to
- * targetPhoenixType
- *
- * @param o
- * @param targetPhoenixType
- * @return Object
- */
- public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
- PDataType inferredPType = getType(o, objectType);
-
- if(inferredPType == null) {
- return null;
- }
+ /**
+ * This method encodes a value with Phoenix data type. It begins
+ * with checking whether an object is BINARY and makes a call to
+ * {@link #castBytes(Object, PDataType)} to convery bytes to
+ * targetPhoenixType
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ */
+ public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) {
+ PDataType inferredPType = getType(o, objectType);
+
+ if(inferredPType == null) {
+ return null;
+ }
- if(inferredPType == PDataType.VARBINARY) {
- try {
- o = castBytes(o, targetPhoenixType);
- if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) {
- inferredPType = getType(o, DataType.findType(o));
- }
- } catch (IOException e) {
- throw new RuntimeException("Error while casting bytes for object " +o);
- }
- }
- if(inferredPType == PDataType.DATE) {
- int inferredSqlType = targetPhoenixType.getSqlType();
+ if(inferredPType == PDataType.VARBINARY) {
+ try {
+ o = castBytes(o, targetPhoenixType);
+ if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) {
+ inferredPType = getType(o, DataType.findType(o));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Error while casting bytes for object " +o);
+ }
+ }
+ if(inferredPType == PDataType.DATE) {
+ int inferredSqlType = targetPhoenixType.getSqlType();
- if(inferredSqlType == Types.DATE) {
- return new Date(((DateTime)o).getMillis());
- }
- if(inferredSqlType == Types.TIME) {
- return new Time(((DateTime)o).getMillis());
- }
- if(inferredSqlType == Types.TIMESTAMP) {
- return new Timestamp(((DateTime)o).getMillis());
- }
- }
-
- if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
- return inferredPType.toObject(o, targetPhoenixType);
- }
-
- throw new RuntimeException(o.getClass().getName()
- + " cannot be coerced to "+targetPhoenixType.toString());
- }
-
- /**
- * This method converts bytes to the target type required
- * for Phoenix. It uses {@link Utf8StorageConverter} for
- * the conversion.
- *
- * @param o
- * @param targetPhoenixType
- * @return Object
- * @throws IOException
- */
- private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
+ if(inferredSqlType == Types.DATE) {
+ return new Date(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIME) {
+ return new Time(((DateTime)o).getMillis());
+ }
+ if(inferredSqlType == Types.TIMESTAMP) {
+ return new Timestamp(((DateTime)o).getMillis());
+ }
+ }
+
+ if (targetPhoenixType == inferredPType || inferredPType.isCoercibleTo(targetPhoenixType)) {
+ return inferredPType.toObject(o, targetPhoenixType);
+ }
+
+ throw new RuntimeException(o.getClass().getName()
+ + " cannot be coerced to "+targetPhoenixType.toString());
+ }
+
+ /**
+ * This method converts bytes to the target type required
+ * for Phoenix. It uses {@link Utf8StorageConverter} for
+ * the conversion.
+ *
+ * @param o
+ * @param targetPhoenixType
+ * @return Object
+ * @throws IOException
+ */
+ private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException {
byte[] bytes = ((DataByteArray)o).get();
switch(targetPhoenixType) {
@@ -213,7 +213,7 @@ public final class TypeUtil {
return binaryConverter.bytesToInteger(bytes).byteValue();
case UNSIGNED_INT:
case INTEGER:
- return binaryConverter.bytesToInteger(bytes);
+ return binaryConverter.bytesToInteger(bytes);
case BOOLEAN:
return binaryConverter.bytesToBoolean(bytes);
case FLOAT:
@@ -227,9 +227,9 @@ public final class TypeUtil {
return binaryConverter.bytesToLong(bytes);
case VARBINARY :
case BINARY:
- return bytes;
+ return bytes;
default:
- return o;
+ return o;
}
}
@@ -240,7 +240,7 @@ public final class TypeUtil {
* @return
* @throws IOException
*/
- public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+ public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
List<Object> columnValues = record.getValues();
if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cabb16f7/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
new file mode 100644
index 0000000..a7399c9
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
+ *
+ */
+public class PhoenixPigDBWritable implements DBWritable {
+
+ private final List<Object> values;
+ private ResourceFieldSchema[] fieldSchemas;
+ private List<ColumnInfo> columnMetadataList;
+
+ public PhoenixPigDBWritable() {
+ this.values = new ArrayList<Object>();
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ for (int i = 0; i < columnMetadataList.size(); i++) {
+ Object o = values.get(i);
+ ColumnInfo columnInfo = columnMetadataList.get(i);
+ byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ try {
+ Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+ if (upsertValue != null) {
+ statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+ } else {
+ statement.setNull(i + 1, columnInfo.getSqlType());
+ }
+ } catch (RuntimeException re) {
+ throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+ ,columnInfo.toString(),re.getMessage()),re);
+
+ }
+ }
+ }
+
+ public void add(Object value) {
+ values.add(value);
+ }
+
+ private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+ PDataType pDataType = PDataType.fromTypeId(sqlType);
+ return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ @Override
+ public void readFields(final ResultSet rs) throws SQLException {
+ Preconditions.checkNotNull(rs);
+ final int noOfColumns = rs.getMetaData().getColumnCount();
+ values.clear();
+ for(int i = 1 ; i <= noOfColumns ; i++) {
+ Object obj = rs.getObject(i);
+ values.add(obj);
+ }
+ }
+
+ public ResourceFieldSchema[] getFieldSchemas() {
+ return fieldSchemas;
+ }
+
+ public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
+ this.fieldSchemas = fieldSchemas;
+ }
+
+ public List<ColumnInfo> getColumnMetadataList() {
+ return columnMetadataList;
+ }
+
+ public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+ this.columnMetadataList = columnMetadataList;
+ }
+
+ public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas,
+ final List<ColumnInfo> columnMetadataList) {
+ final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
+ dbWritable.setFieldSchemas(fieldSchemas);
+ dbWritable.setColumnMetadataList(columnMetadataList);
+ return dbWritable;
+ }
+
+}