You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/09 03:09:12 UTC
[2/3] phoenix git commit: PHOENIX-1454 - Map Reduce over Phoenix
tables
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 500e403..eb2c124 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.pig;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
@@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
@@ -75,74 +81,76 @@ import org.apache.pig.impl.util.UDFContext;
@SuppressWarnings("rawtypes")
public class PhoenixHBaseStorage implements StoreFuncInterface {
- private PhoenixPigConfiguration config;
- private RecordWriter<NullWritable, PhoenixRecord> writer;
- private String contextSignature = null;
- private ResourceSchema schema;
- private long batchSize;
- private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
-
- // Set of options permitted
- private final static Options validOptions = new Options();
- private final static CommandLineParser parser = new GnuParser();
- private final static String SCHEMA = "_schema";
-
- private final CommandLine configuredOptions;
- private final String server;
-
- public PhoenixHBaseStorage(String server) throws ParseException {
- this(server, null);
- }
-
- public PhoenixHBaseStorage(String server, String optString)
- throws ParseException {
- populateValidOptions();
- this.server = server;
-
- String[] optsArr = optString == null ? new String[0] : optString.split(" ");
- try {
- configuredOptions = parser.parse(validOptions, optsArr);
- } catch (ParseException e) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("[-batchSize]", validOptions);
- throw e;
- }
-
- batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
- }
-
- private static void populateValidOptions() {
- validOptions.addOption("batchSize", true, "Specify upsert batch size");
- }
-
- /**
- * Returns UDFProperties based on <code>contextSignature</code>.
- */
- private Properties getUDFProperties() {
- return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
- }
-
-
- /**
- * Parse the HBase table name and configure job
- */
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- URI locationURI;
+ private Configuration config;
+ private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+ private List<ColumnInfo> columnInfo = null;
+ private String contextSignature = null;
+ private ResourceSchema schema;
+ private long batchSize;
+ private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+ // Set of options permitted
+ private final static Options validOptions = new Options();
+ private final static CommandLineParser parser = new GnuParser();
+ private final static String SCHEMA = "_schema";
+
+ private final CommandLine configuredOptions;
+ private final String server;
+
+ public PhoenixHBaseStorage(String server) throws ParseException {
+ this(server, null);
+ }
+
+ public PhoenixHBaseStorage(String server, String optString)
+ throws ParseException {
+ populateValidOptions();
+ this.server = server;
+
+ String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+ try {
+ configuredOptions = parser.parse(validOptions, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("[-batchSize]", validOptions);
+ throw e;
+ }
+ batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+ }
+
+ private static void populateValidOptions() {
+ validOptions.addOption("batchSize", true, "Specify upsert batch size");
+ }
+
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+ }
+
+
+ /**
+ * Parse the HBase table name and configure job
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ URI locationURI;
try {
locationURI = new URI(location);
if (!"hbase".equals(locationURI.getScheme())) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
}
+ config = job.getConfiguration();
+ config.set(HConstants.ZOOKEEPER_QUORUM, server);
String tableName = locationURI.getAuthority();
// strip off the leading path token '/'
String columns = null;
if(!locationURI.getPath().isEmpty()) {
columns = locationURI.getPath().substring(1);
+ PhoenixConfigurationUtil.setUpsertColumnNames(config, columns);
}
- config = new PhoenixPigConfiguration(job.getConfiguration());
- config.configure(server, tableName, batchSize, columns);
-
+ PhoenixConfigurationUtil.setOutputTableName(config,tableName);
+ PhoenixConfigurationUtil.setBatchSize(config,batchSize);
String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
if (serializedSchema != null) {
schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
@@ -150,59 +158,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
} catch (URISyntaxException e) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
}
- }
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
@Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
- this.writer =writer;
- }
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = writer;
+ try {
+ this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config);
+ } catch(SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
- @Override
- public void putNext(Tuple t) throws IOException {
+ @Override
+ public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
-
- PhoenixRecord record = new PhoenixRecord(fieldSchemas);
-
+ PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
for(int i=0; i<t.size(); i++) {
- record.add(t.get(i));
+ record.add(t.get(i));
+ }
+ try {
+ this.writer.write(null, record);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- try {
- writer.write(null, record);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- }
+ }
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
- }
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- }
-
- @Override
- public void cleanupOnSuccess(String location, Job job) throws IOException {
- }
-
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
- return location;
- }
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- return outputFormat;
- }
-
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- schema = s;
- getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
- }
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return outputFormat;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
deleted file mode 100644
index c6b6ec9..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
-
-/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
- *
- */
-public class PhoenixPigConfiguration {
-
- private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
-
- private PhoenixPigConfigurationUtil util;
-
- /**
- * Speculative execution of Map tasks
- */
- public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
-
- /**
- * Speculative execution of Reduce tasks
- */
- public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
-
- public static final String SERVER_NAME = "phoenix.hbase.server.name";
-
- public static final String TABLE_NAME = "phoenix.hbase.table.name";
-
- public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
-
- public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
-
- public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
-
- public static final String SELECT_STATEMENT = "phoenix.select.stmt";
-
- public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
-
- //columns projected given as part of LOAD.
- public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
-
- public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
-
- public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
-
- // the delimiter supported during LOAD and STORE when projected columns are given.
- public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
-
- public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
-
- public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
-
- private final Configuration conf;
-
- public PhoenixPigConfiguration(Configuration conf) {
- this.conf = conf;
- this.util = new PhoenixPigConfigurationUtil();
- }
-
- public void configure(String server, String tableName, long batchSize) {
- configure(server,tableName,batchSize,null);
- }
-
- public void configure(String server, String tableName, long batchSize, String columns) {
- conf.set(SERVER_NAME, server);
- conf.set(TABLE_NAME, tableName);
- conf.setLong(UPSERT_BATCH_SIZE, batchSize);
- if (isNotEmpty(columns)) {
- conf.set(UPSERT_COLUMNS, columns);
- }
- conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
- conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
- }
-
-
- /**
- * Creates a {@link Connection} with autoCommit set to false.
- * @throws SQLException
- */
- public Connection getConnection() throws SQLException {
- return getUtil().getConnection(getConfiguration());
- }
-
- public String getUpsertStatement() throws SQLException {
- return getUtil().getUpsertStatement(getConfiguration(), getTableName());
- }
-
- public long getBatchSize() throws SQLException {
- return getUtil().getBatchSize(getConfiguration());
- }
-
- public String getServer() {
- return conf.get(SERVER_NAME);
- }
-
- public List<ColumnInfo> getColumnMetadataList() throws SQLException {
- return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public String getUpsertColumns() {
- return conf.get(UPSERT_COLUMNS);
- }
-
- public String getTableName() {
- return conf.get(TABLE_NAME);
- }
-
- public Configuration getConfiguration() {
- return this.conf;
- }
-
- public String getSelectStatement() throws SQLException {
- return getUtil().getSelectStatement(getConfiguration(), getTableName());
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
- return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public int getSelectColumnsCount() throws SQLException {
- return getUtil().getSelectColumnsCount(getConfiguration(), getTableName());
- }
-
- public SchemaType getSchemaType() {
- final String schemaTp = conf.get(SCHEMA_TYPE);
- return SchemaType.valueOf(schemaTp);
- }
-
-
- public void setServerName(final String zookeeperQuorum) {
- this.conf.set(SERVER_NAME, zookeeperQuorum);
- }
-
- public void setTableName(final String tableName) {
- Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
- this.conf.set(TABLE_NAME, tableName);
- }
-
- public void setSelectStatement(final String selectStatement) {
- this.conf.set(SELECT_STATEMENT, selectStatement);
- }
-
- public void setSelectColumns(String selectColumns) {
- this.conf.set(SELECT_COLUMNS, selectColumns);
- }
-
- public PhoenixPigConfigurationUtil getUtil() {
- return this.util;
- }
-
- public void setSchemaType(final SchemaType schemaType) {
- this.conf.set(SCHEMA_TYPE, schemaType.name());
- }
-
- public enum SchemaType {
- TABLE,
- QUERY;
- }
-
-
- @VisibleForTesting
- static class PhoenixPigConfigurationUtil {
-
- public Connection getConnection(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Properties props = new Properties();
- final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
- conn.setAutoCommit(false);
- return conn;
- }
-
- public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- String upsertColumns = configuration.get(UPSERT_COLUMNS);
- List<String> upsertColumnList = null;
- if(isNotEmpty(upsertColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
- LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
- ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
- ));
- }
- List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String upsertStmt = configuration.get(UPSERT_STATEMENT);
- if(isNotEmpty(upsertStmt)) {
- return upsertStmt;
- }
- final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
- final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
- if (useUpsertColumns) {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
- LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
- } else {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
- LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
- }
- configuration.set(UPSERT_STATEMENT, upsertStmt);
- return upsertStmt;
-
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- final List<String> selectColumnList = getSelectColumnList(configuration);
- final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- private List<String> getSelectColumnList(
- final Configuration configuration) {
- String selectColumns = configuration.get(SELECT_COLUMNS);
- List<String> selectColumnList = null;
- if(isNotEmpty(selectColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
- LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
- ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
- ));
- }
- return selectColumnList;
- }
-
- public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String selectStmt = configuration.get(SELECT_STATEMENT);
- if(isNotEmpty(selectStmt)) {
- return selectStmt;
- }
- final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
- selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
- LOG.info("Select Statement: "+ selectStmt);
- configuration.set(SELECT_STATEMENT, selectStmt);
- return selectStmt;
- }
-
- public long getBatchSize(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
- if(batchSize <= 0) {
- Connection conn = getConnection(configuration);
- batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
- closeConnection(conn);
- }
- configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
- return batchSize;
- }
-
- public int getSelectColumnsCount(Configuration configuration,
- String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- final String schemaTp = configuration.get(SCHEMA_TYPE);
- final SchemaType schemaType = SchemaType.valueOf(schemaTp);
- int count = 0;
- if(SchemaType.QUERY.equals(schemaType)) {
- List<String> selectedColumnList = getSelectColumnList(configuration);
- count = selectedColumnList == null ? 0 : selectedColumnList.size();
- } else {
- List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName);
- count = columnInfos == null ? 0 : columnInfos.size();
- }
- return count;
- }
-
- private void closeConnection(final Connection connection) throws SQLException {
- if(connection != null) {
- connection.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
deleted file mode 100644
index 2ef7914..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * The InputFormat class for generating the splits and creating the record readers.
- *
- */
-public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
- private PhoenixPigConfiguration phoenixConfiguration;
- private Connection connection;
- private QueryPlan queryPlan;
-
- /**
- * instantiated by framework
- */
- public PhoenixInputFormat() {
- }
-
- @Override
- public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- try {
- return new PhoenixRecordReader(phoenixConfiguration,queryPlan);
- }catch(SQLException sqle) {
- throw new IOException(sqle);
- }
- }
-
-
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
- return splits;
- }
-
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
- Preconditions.checkNotNull(qplan);
- Preconditions.checkNotNull(splits);
- final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
- for (List<Scan> scans : qplan.getScans()) {
- psplits.add(new PhoenixInputSplit(scans));
- }
- return psplits;
- }
-
- public void setConf(Configuration configuration) {
- this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- }
-
- public PhoenixPigConfiguration getConf() {
- return this.phoenixConfiguration;
- }
-
- private Connection getConnection() {
- try {
- if (this.connection == null) {
- this.connection = phoenixConfiguration.getConnection();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return connection;
- }
-
- /**
- * Returns the query plan associated with the select query.
- * @param context
- * @return
- * @throws IOException
- * @throws SQLException
- */
- private QueryPlan getQueryPlan(final JobContext context) throws IOException {
- Preconditions.checkNotNull(context);
- if(queryPlan == null) {
- try{
- final Connection connection = getConnection();
- final String selectStatement = getConf().getSelectStatement();
- Preconditions.checkNotNull(selectStatement);
- final Statement statement = connection.createStatement();
- final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- // Optimize the query plan so that we potentially use secondary indexes
- this.queryPlan = pstmt.optimizeQuery(selectStatement);
- // Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator();
- } catch(Exception exception) {
- LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
- throw new RuntimeException(exception);
- }
- }
- return queryPlan;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
deleted file mode 100644
index b1d015a..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- *
- * Input split class to hold the lower and upper bound range. {@link KeyRange}
- *
- */
-public class PhoenixInputSplit extends InputSplit implements Writable {
-
- private List<Scan> scans;
- private KeyRange keyRange;
-
- /**
- * No Arg constructor
- */
- public PhoenixInputSplit() {
- }
-
- /**
- *
- * @param keyRange
- */
- public PhoenixInputSplit(final List<Scan> scans) {
- Preconditions.checkNotNull(scans);
- Preconditions.checkState(!scans.isEmpty());
- this.scans = scans;
- init();
- }
-
- public List<Scan> getScans() {
- return scans;
- }
-
- public KeyRange getKeyRange() {
- return keyRange;
- }
-
- private void init() {
- this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int count = WritableUtils.readVInt(input);
- scans = Lists.newArrayListWithExpectedSize(count);
- for (int i = 0; i < count; i++) {
- byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
- input.readFully(protoScanBytes);
- ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
- Scan scan = ProtobufUtil.toScan(protoScan);
- scans.add(scan);
- }
- init();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- Preconditions.checkNotNull(scans);
- WritableUtils.writeVInt(output, scans.size());
- for (Scan scan : scans) {
- ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
- byte[] protoScanBytes = protoScan.toByteArray();
- WritableUtils.writeVInt(output, protoScanBytes.length);
- output.write(protoScanBytes);
- }
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + keyRange.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- // TODO: review: it's a reasonable check to use the keyRange,
- // but it's not perfect. Do we need an equals impl?
- if (this == obj) { return true; }
- if (obj == null) { return false; }
- if (!(obj instanceof PhoenixInputSplit)) { return false; }
- PhoenixInputSplit other = (PhoenixInputSplit)obj;
- if (keyRange == null) {
- if (other.keyRange != null) { return false; }
- } else if (!keyRange.equals(other.keyRange)) { return false; }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
deleted file mode 100644
index a8d9d8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.jdbc.PhoenixStatement;
-
-/**
- *
- * {@link OutputCommitter} implementation for Pig tasks using Phoenix
- * connections to upsert to HBase
- *
- *
- *
- */
-public class PhoenixOutputCommitter extends OutputCommitter {
- private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
-
- private final PhoenixOutputFormat outputFormat;
-
- public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
- if(outputFormat == null) {
- throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
- }
- this.outputFormat = outputFormat;
- }
-
- /**
- * TODO implement rollback functionality.
- *
- * {@link PhoenixStatement#execute(String)} is buffered on the client, this makes
- * it difficult to implement rollback as once a commit is issued it's hard to go
- * back all the way to undo.
- */
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- }
-
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- commit(outputFormat.getConnection(context.getConfiguration()));
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return true;
- }
-
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
- }
-
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- }
-
- /**
- * Commit a transaction on task completion
- *
- * @param connection
- * @throws IOException
- */
- private void commit(Connection connection) throws IOException {
- try {
- if (connection == null || connection.isClosed()) {
- throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
- }
- } catch (SQLException e) {
- throw new IOException("Exception calling isClosed on connection", e);
- }
-
- try {
- LOG.debug("Commit called on task completion");
- connection.commit();
- } catch (SQLException e) {
- throw new IOException("Exception while trying to commit a connection. ", e);
- } finally {
- try {
- LOG.debug("Closing connection to database on task completion");
- connection.close();
- } catch (SQLException e) {
- LOG.warn("Exception while trying to close database connection", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
deleted file mode 100644
index 9c29f8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * {@link OutputFormat} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
- private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
-
- private Connection connection;
- private PhoenixPigConfiguration config;
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
- }
-
- /**
- * TODO Implement {@link OutputCommitter} to rollback in case of task failure
- */
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new PhoenixOutputCommitter(this);
- }
-
- @Override
- public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- try {
- return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * This method creates a database connection. A single instance is created
- * and passed around for re-use.
- *
- * @param configuration
- * @return
- * @throws IOException
- */
- synchronized Connection getConnection(Configuration configuration) throws IOException {
- if (connection != null) {
- return connection;
- }
-
- config = new PhoenixPigConfiguration(configuration);
- try {
- LOG.info("Initializing new Phoenix connection...");
- connection = config.getConnection();
- LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
- return connection;
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
deleted file mode 100644
index 5063ed0..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- *
- */
-public class PhoenixRecord implements Writable {
-
- private final List<Object> values;
- private final ResourceFieldSchema[] fieldSchemas;
-
- public PhoenixRecord() {
- this(null);
- }
-
- public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
- this.values = new ArrayList<Object>();
- this.fieldSchemas = fieldSchemas;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- }
-
- public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
- for (int i = 0; i < columnMetadataList.size(); i++) {
- Object o = values.get(i);
- ColumnInfo columnInfo = columnMetadataList.get(i);
- byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- try {
- Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
- if (upsertValue != null) {
- statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
- } else {
- statement.setNull(i + 1, columnInfo.getSqlType());
- }
- } catch (RuntimeException re) {
- throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
- ,columnInfo.toString(),re.getMessage()),re);
-
- }
- }
-
- statement.execute();
- }
-
- public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
- Preconditions.checkNotNull(rs);
- Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0");
- values.clear();
- for(int i = 1 ; i <= noOfColumns ; i++) {
- Object obj = rs.getObject(i);
- values.add(obj);
- }
- }
-
- public void add(Object value) {
- values.add(value);
- }
-
- private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
- PDataType pDataType = PDataType.fromTypeId(sqlType);
-
- return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
- }
-
- public List<Object> getValues() {
- return values;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
deleted file mode 100644
index f6808a8..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * RecordReader that process the scan and returns PhoenixRecord
- *
- */
-public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
- private final QueryPlan queryPlan;
- private final int columnsCount;
- private NullWritable key = NullWritable.get();
- private PhoenixRecord value = null;
- private ResultIterator resultIterator = null;
- private PhoenixResultSet resultSet;
-
- public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
-
- Preconditions.checkNotNull(pConfiguration);
- Preconditions.checkNotNull(qPlan);
- this.phoenixConfiguration = pConfiguration;
- this.queryPlan = qPlan;
- this.columnsCount = phoenixConfiguration.getSelectColumnsCount();
- }
-
- @Override
- public void close() throws IOException {
- if(resultIterator != null) {
- try {
- resultIterator.close();
- } catch (SQLException e) {
- LOG.error(" Error closing resultset.");
- }
- }
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
- final List<Scan> scans = pSplit.getScans();
- try {
- List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
- for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
- PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
- iterators.add(peekingResultIterator);
- }
- ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
- if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
- iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
- }
- this.resultIterator = iterator;
- this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
- Throwables.propagate(e);
- }
-
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (key == null) {
- key = NullWritable.get();
- }
- if (value == null) {
- value = new PhoenixRecord();
- }
- Preconditions.checkNotNull(this.resultSet);
- try {
- if(!resultSet.next()) {
- return false;
- }
- value.read(resultSet,columnsCount);
- return true;
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
- Throwables.propagate(e);
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
deleted file mode 100644
index c980a38..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- *
- * {@link RecordWriter} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
-
- private long numRecords = 0;
-
- private final Connection conn;
- private final PreparedStatement statement;
- private final PhoenixPigConfiguration config;
- private final long batchSize;
-
- public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
- this.conn = conn;
- this.config = config;
- this.batchSize = config.getBatchSize();
- this.statement = this.conn.prepareStatement(config.getUpsertStatement());
- }
-
-
- /**
- * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
- *
- */
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- }
-
- @Override
- public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {
- try {
- record.write(statement, config.getColumnMetadataList());
- numRecords++;
-
- if (numRecords % batchSize == 0) {
- LOG.debug("commit called on a batch of size : " + batchSize);
- conn.commit();
- }
- } catch (SQLException e) {
- throw new IOException("Exception while committing to database.", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
deleted file mode 100644
index 3ea9b5b..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import java.util.List;
-
-import org.apache.phoenix.util.ColumnInfo;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- *
- * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
- *
- */
-public final class ColumnInfoToStringEncoderDecoder {
-
- private static final String COLUMN_INFO_DELIMITER = "|";
-
- private ColumnInfoToStringEncoderDecoder() {
-
- }
-
- public static String encode(List<ColumnInfo> columnInfos) {
- Preconditions.checkNotNull(columnInfos);
- return Joiner.on(COLUMN_INFO_DELIMITER).
- skipNulls().join(columnInfos);
- }
-
- public static List<ColumnInfo> decode(final String columnInfoStr) {
- Preconditions.checkNotNull(columnInfoStr);
- List<ColumnInfo> columnInfos = Lists.newArrayList(
- Iterables.transform(
- Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
- new Function<String, ColumnInfo>() {
- @Override
- public ColumnInfo apply(String colInfo) {
- if (colInfo.isEmpty()) {
- return null;
- }
- return ColumnInfo.fromString(colInfo);
- }
- }));
- return columnInfos;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index 9f8a5e4..4f7d776 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -25,8 +25,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
@@ -46,19 +47,20 @@ public final class PhoenixPigSchemaUtil {
private PhoenixPigSchemaUtil() {
}
- public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
final ResourceSchema schema = new ResourceSchema();
try {
- List<ColumnInfo> columns = null;
- if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) {
- final String sqlQuery = phoenixConfiguration.getSelectStatement();
- Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
- final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
- columns = function.apply(sqlQuery);
- } else {
- columns = phoenixConfiguration.getSelectColumnMetadataList();
- }
+ List<ColumnInfo> columns = null;
+ final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
+ if(SchemaType.QUERY.equals(schemaType)) {
+ final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
+ final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
+ columns = function.apply(sqlQuery);
+ } else {
+ columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
+ }
ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
int i = 0;
for(ColumnInfo cinfo : columns) {
@@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil {
}
return schema;
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index 1b3a90a..f0148a6 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -26,16 +26,16 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
/**
@@ -46,21 +46,20 @@ import com.google.common.collect.Lists;
public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
- private PhoenixPigConfiguration phoenixConfiguration;
+ private final Configuration configuration;
- public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
- Preconditions.checkNotNull(phoenixConfiguration);
- this.phoenixConfiguration = phoenixConfiguration;
+ public QuerySchemaParserFunction(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ this.configuration = configuration;
}
@Override
public Pair<String, String> apply(final String selectStatement) {
Preconditions.checkNotNull(selectStatement);
Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
- Preconditions.checkNotNull(this.phoenixConfiguration);
Connection connection = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
@@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
return new Pair<String, String>(tableName, columnsAsStr);
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error(" Error closing connection ");
+ throw new RuntimeException(sqle);
}
}
}
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 52f646c..3ed35bb 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -26,60 +26,59 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.util.ColumnInfo;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> {
-
- private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
+
+ private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
+ private final Configuration configuration;
- public SqlQueryToColumnInfoFunction(
- final PhoenixPigConfiguration phoenixPigConfiguration) {
- super();
- this.phoenixConfiguration = phoenixPigConfiguration;
- }
+ public SqlQueryToColumnInfoFunction(final Configuration configuration) {
+ this.configuration = configuration;
+ }
- @Override
- public List<ColumnInfo> apply(String sqlQuery) {
- Preconditions.checkNotNull(sqlQuery);
- Connection connection = null;
- List<ColumnInfo> columnInfos = null;
+ @Override
+ public List<ColumnInfo> apply(String sqlQuery) {
+ Preconditions.checkNotNull(sqlQuery);
+ Connection connection = null;
+ List<ColumnInfo> columnInfos = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
- @Override
- public ColumnInfo apply(final ColumnProjector columnProjector) {
- return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
- }
-
+ @Override
+ public ColumnInfo apply(final ColumnProjector columnProjector) {
+ return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
+ }
+
});
- } catch (SQLException e) {
+ } catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error("Error closing connection!!");
+ throw new RuntimeException(sqle);
}
}
}
- return columnInfos;
- }
+ return columnInfos;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 1cdd66d..1da2d01 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
import org.apache.phoenix.schema.PDataType;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -240,7 +240,7 @@ public final class TypeUtil {
* @return
* @throws IOException
*/
- public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+ public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
List<Object> columnValues = record.getValues();
if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
new file mode 100644
index 0000000..a7399c9
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
+ *
+ */
+public class PhoenixPigDBWritable implements DBWritable {
+
+ private final List<Object> values;
+ private ResourceFieldSchema[] fieldSchemas;
+ private List<ColumnInfo> columnMetadataList;
+
+ public PhoenixPigDBWritable() {
+ this.values = new ArrayList<Object>();
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ for (int i = 0; i < columnMetadataList.size(); i++) {
+ Object o = values.get(i);
+ ColumnInfo columnInfo = columnMetadataList.get(i);
+ byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ try {
+ Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+ if (upsertValue != null) {
+ statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+ } else {
+ statement.setNull(i + 1, columnInfo.getSqlType());
+ }
+ } catch (RuntimeException re) {
+ throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+ ,columnInfo.toString(),re.getMessage()),re);
+
+ }
+ }
+ }
+
+ public void add(Object value) {
+ values.add(value);
+ }
+
+ private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+ PDataType pDataType = PDataType.fromTypeId(sqlType);
+ return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ @Override
+ public void readFields(final ResultSet rs) throws SQLException {
+ Preconditions.checkNotNull(rs);
+ final int noOfColumns = rs.getMetaData().getColumnCount();
+ values.clear();
+ for(int i = 1 ; i <= noOfColumns ; i++) {
+ Object obj = rs.getObject(i);
+ values.add(obj);
+ }
+ }
+
+ public ResourceFieldSchema[] getFieldSchemas() {
+ return fieldSchemas;
+ }
+
+ public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
+ this.fieldSchemas = fieldSchemas;
+ }
+
+ public List<ColumnInfo> getColumnMetadataList() {
+ return columnMetadataList;
+ }
+
+ public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+ this.columnMetadataList = columnMetadataList;
+ }
+
+ public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas,
+ final List<ColumnInfo> columnMetadataList) {
+ final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
+ dbWritable.setFieldSchemas(fieldSchemas);
+ dbWritable.setColumnMetadataList(columnMetadataList);
+ return dbWritable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
deleted file mode 100644
index ac254e6..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.junit.Assert.assertEquals;
-
-import java.sql.SQLException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-
-/**
- * Tests for PhoenixPigConfiguration.
- *
- */
-public class PhoenixPigConfigurationTest {
-
-
- @Test
- public void testBasicConfiguration() throws SQLException {
- Configuration conf = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf);
- final String zkQuorum = "localhost";
- final String tableName = "TABLE";
- final long batchSize = 100;
- phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
- assertEquals(zkQuorum,phoenixConfiguration.getServer());
- assertEquals(tableName,phoenixConfiguration.getTableName());
- assertEquals(batchSize,phoenixConfiguration.getBatchSize());
- }
-
- /* @Test
- public void testConfiguration() throws SQLException {
- Configuration configuration = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- final String zkQuorum = "localhost";
- final String tableName = "TABLE";
- final long batchSize = 100;
- phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
- PhoenixPigConfigurationUtil util = Mockito.mock(PhoenixPigConfigurationUtil.class);
- phoenixConfiguration.setUtil(util);
- phoenixConfiguration.getColumnMetadataList();
- Mockito.verify(util).getUpsertColumnMetadataList(configuration, tableName);
- Mockito.verifyNoMoreInteractions(util);
-
- phoenixConfiguration.getSelectStatement();
- Mockito.verify(util).getSelectStatement(configuration, tableName);
- Mockito.verifyNoMoreInteractions(util);
- }
-
- @Test
- public void testWithSpy() throws SQLException {
- Configuration configuration = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- final String zkQuorum = "localhost";
- final String tableName = "TABLE";
- final long batchSize = 100;
- phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
- phoenixConfiguration.setSelectStatement("SELECT 1 from TABLE");
- PhoenixPigConfigurationUtil util = new PhoenixPigConfigurationUtil();
- PhoenixPigConfigurationUtil spied = Mockito.spy(util);
- phoenixConfiguration.setUtil(spied);
-
- phoenixConfiguration.getSelectStatement();
- Mockito.verify(spied,Mockito.times(1)).getSelectStatement(configuration, tableName);
- Mockito.verifyNoMoreInteractions(util);
- }*/
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
deleted file mode 100644
index 9777bb5..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
- */
-public class ColumnInfoToStringEncoderDecoderTest {
-
- @Test
- public void testEncode() {
- final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
- assertEquals(columnInfo.toString(),encodedColumnInfo);
- }
-
- @Test
- public void testDecode() {
- final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
- assertEquals(columnInfo.toString(),encodedColumnInfo);
- }
-
- @Test
- public void testEncodeDecodeWithNulls() {
- final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final ColumnInfo columnInfo2 = null;
- final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
- final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- assertEquals(1,decodedColumnInfo.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 310128c..7a861b9 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -29,8 +29,10 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
@@ -54,9 +56,11 @@ public class PhoenixPigSchemaUtilTest {
@Test
public void testSchema() throws SQLException, IOException {
- final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final Configuration configuration = mock(Configuration.class);
final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN);
- when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SCHEMA_TYPE)).thenReturn(SchemaType.TABLE.name());
final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration);
// expected schema.
@@ -75,9 +79,10 @@ public class PhoenixPigSchemaUtilTest {
@Test(expected=IllegalDataException.class)
public void testUnSupportedTypes() throws SQLException, IOException {
- final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final Configuration configuration = mock(Configuration.class);
final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN);
- when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
PhoenixPigSchemaUtil.getResourceSchema(configuration);
fail("We currently don't support Array type yet. WIP!!");
}