You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/09 03:09:13 UTC
[3/3] phoenix git commit: PHOENIX-1454 - Map Reduce over Phoenix
tables
PHOENIX-1454 - Map Reduce over Phoenix tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f84e5da3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f84e5da3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f84e5da3
Branch: refs/heads/master
Commit: f84e5da33c21a728e73924c07506dd63e4621872
Parents: ceded22
Author: ravimagham <ra...@apache.org>
Authored: Mon Dec 8 18:06:38 2014 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Mon Dec 8 18:06:38 2014 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixInputFormat.java | 117 +++++++
.../phoenix/mapreduce/PhoenixInputSplit.java | 129 +++++++
.../mapreduce/PhoenixOutputCommitter.java | 54 +++
.../phoenix/mapreduce/PhoenixOutputFormat.java | 62 ++++
.../phoenix/mapreduce/PhoenixRecordReader.java | 140 ++++++++
.../phoenix/mapreduce/PhoenixRecordWriter.java | 91 +++++
.../util/ColumnInfoToStringEncoderDecoder.java | 65 ++++
.../phoenix/mapreduce/util/ConnectionUtil.java | 49 +++
.../util/PhoenixConfigurationUtil.java | 299 ++++++++++++++++
.../mapreduce/util/PhoenixMapReduceUtil.java | 99 ++++++
.../org/apache/phoenix/util/PhoenixRuntime.java | 6 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 22 +-
.../ColumnInfoToStringEncoderDecoderTest.java | 61 ++++
.../util/PhoenixConfigurationUtilTest.java | 124 +++++++
.../org/apache/phoenix/util/QueryUtilTest.java | 2 +-
.../phoenix/pig/PhoenixPigConfigurationIT.java | 109 ------
.../apache/phoenix/pig/PhoenixHBaseLoader.java | 45 +--
.../apache/phoenix/pig/PhoenixHBaseStorage.java | 218 ++++++------
.../phoenix/pig/PhoenixPigConfiguration.java | 340 -------------------
.../phoenix/pig/hadoop/PhoenixInputFormat.java | 142 --------
.../phoenix/pig/hadoop/PhoenixInputSplit.java | 134 --------
.../pig/hadoop/PhoenixOutputCommitter.java | 111 ------
.../phoenix/pig/hadoop/PhoenixOutputFormat.java | 94 -----
.../phoenix/pig/hadoop/PhoenixRecord.java | 112 ------
.../phoenix/pig/hadoop/PhoenixRecordReader.java | 142 --------
.../phoenix/pig/hadoop/PhoenixRecordWriter.java | 83 -----
.../util/ColumnInfoToStringEncoderDecoder.java | 69 ----
.../phoenix/pig/util/PhoenixPigSchemaUtil.java | 27 +-
.../pig/util/QuerySchemaParserFunction.java | 21 +-
.../pig/util/SqlQueryToColumnInfoFunction.java | 51 ++-
.../org/apache/phoenix/pig/util/TypeUtil.java | 4 +-
.../pig/writable/PhoenixPigDBWritable.java | 121 +++++++
.../pig/PhoenixPigConfigurationTest.java | 86 -----
.../ColumnInfoToStringEncoderDecoderTest.java | 61 ----
.../pig/util/PhoenixPigSchemaUtilTest.java | 17 +-
.../pig/util/QuerySchemaParserFunctionTest.java | 22 +-
.../util/SqlQueryToColumnInfoFunctionTest.java | 22 +-
37 files changed, 1642 insertions(+), 1709 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
new file mode 100644
index 0000000..7c67c2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link InputFormat} implementation from Phoenix.
+ *
+ */
+public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable,T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+
+ /**
+ * instantiated by framework
+ */
+ public PhoenixInputFormat() {
+ }
+
+ @Override
+ public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ final Configuration configuration = context.getConfiguration();
+ final QueryPlan queryPlan = getQueryPlan(context,configuration);
+ @SuppressWarnings("unchecked")
+ final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+ return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan);
+ }
+
+
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ final Configuration configuration = context.getConfiguration();
+ final QueryPlan queryPlan = getQueryPlan(context,configuration);
+ final List<KeyRange> allSplits = queryPlan.getSplits();
+ final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+ return splits;
+ }
+
+ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+ Preconditions.checkNotNull(qplan);
+ Preconditions.checkNotNull(splits);
+ final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+ for (List<Scan> scans : qplan.getScans()) {
+ psplits.add(new PhoenixInputSplit(scans));
+ }
+ return psplits;
+ }
+
+ /**
+ * Returns the query plan associated with the select query.
+ * @param context
+ * @return
+ * @throws IOException
+ * @throws SQLException
+ */
+ private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+ Preconditions.checkNotNull(context);
+ try{
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator();
+ return queryPlan;
+ } catch(Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+ throw new RuntimeException(exception);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
new file mode 100644
index 0000000..b222fc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+ private List<Scan> scans;
+ private KeyRange keyRange;
+
+ /**
+ * No Arg constructor
+ */
+ public PhoenixInputSplit() {
+ }
+
+ /**
+ *
+ * @param keyRange
+ */
+ public PhoenixInputSplit(final List<Scan> scans) {
+ Preconditions.checkNotNull(scans);
+ Preconditions.checkState(!scans.isEmpty());
+ this.scans = scans;
+ init();
+ }
+
+ public List<Scan> getScans() {
+ return scans;
+ }
+
+ public KeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ private void init() {
+ this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int count = WritableUtils.readVInt(input);
+ scans = Lists.newArrayListWithExpectedSize(count);
+ for (int i = 0; i < count; i++) {
+ byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
+ input.readFully(protoScanBytes);
+ ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ scans.add(scan);
+ }
+ init();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ Preconditions.checkNotNull(scans);
+ WritableUtils.writeVInt(output, scans.size());
+ for (Scan scan : scans) {
+ ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
+ byte[] protoScanBytes = protoScan.toByteArray();
+ WritableUtils.writeVInt(output, protoScanBytes.length);
+ output.write(protoScanBytes);
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + keyRange.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) { return true; }
+ if (obj == null) { return false; }
+ if (!(obj instanceof PhoenixInputSplit)) { return false; }
+ PhoenixInputSplit other = (PhoenixInputSplit)obj;
+ if (keyRange == null) {
+ if (other.keyRange != null) { return false; }
+ } else if (!keyRange.equals(other.keyRange)) { return false; }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..ffee5c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A no-op {@link OutputCommitter}
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+
+ public PhoenixOutputCommitter() {}
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
new file mode 100644
index 0000000..e55b977
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix.
+ *
+ */
+public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
+ private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ }
+
+ /**
+ *
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new PhoenixOutputCommitter();
+ }
+
+ @Override
+ public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ return new PhoenixRecordWriter<T>(context.getConfiguration());
+ } catch (SQLException e) {
+ LOG.error("Error calling PhoenixRecordWriter " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
new file mode 100644
index 0000000..2c206ab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link RecordReader} implementation that iterates over the the records.
+ */
+public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+ private final Configuration configuration;
+ private final QueryPlan queryPlan;
+ private NullWritable key = NullWritable.get();
+ private T value = null;
+ private Class<T> inputClass;
+ private ResultIterator resultIterator = null;
+ private PhoenixResultSet resultSet;
+
+ public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(queryPlan);
+ this.inputClass = inputClass;
+ this.configuration = configuration;
+ this.queryPlan = queryPlan;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(resultIterator != null) {
+ try {
+ resultIterator.close();
+ } catch (SQLException e) {
+ LOG.error(" Error closing resultset.");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public T getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+ final List<Scan> scans = pSplit.getScans();
+ try {
+ List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ for (Scan scan : scans) {
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+ iterators.add(peekingResultIterator);
+ }
+ ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+ if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+ iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+ }
+ this.resultIterator = iterator;
+ this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = NullWritable.get();
+ }
+ if (value == null) {
+ value = ReflectionUtils.newInstance(inputClass, this.configuration);
+ }
+ Preconditions.checkNotNull(this.resultSet);
+ try {
+ if(!resultSet.next()) {
+ return false;
+ }
+ value.readFields(resultSet);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
new file mode 100644
index 0000000..4d26bf4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+
+/**
+ * Default {@link RecordWriter} implementation from Phoenix
+ *
+ */
+public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<NullWritable, T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+
+ private final Connection conn;
+ private final PreparedStatement statement;
+ private final long batchSize;
+ private long numRecords = 0;
+
+ public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
+ this.conn = ConnectionUtil.getConnection(configuration);
+ this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+ final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.statement = this.conn.prepareStatement(upsertQuery);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ statement.executeBatch();
+ conn.commit();
+ } catch (SQLException e) {
+ LOG.error("SQLException while performing the commit for the task.");
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ statement.close();
+ conn.close();
+ }
+ catch (SQLException ex) {
+ LOG.error("SQLException while closing the connection for the task.");
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void write(NullWritable n, T record) throws IOException, InterruptedException {
+ try {
+ record.write(statement);
+ numRecords++;
+ statement.addBatch();
+ if (numRecords % batchSize == 0) {
+ LOG.debug("commit called on a batch of size : " + batchSize);
+ statement.executeBatch();
+ conn.commit();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Exception while committing to database.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..ec52fba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ */
+public class ColumnInfoToStringEncoderDecoder {
+
+ private static final String COLUMN_INFO_DELIMITER = "|";
+
+ private ColumnInfoToStringEncoderDecoder() {
+
+ }
+
+ public static String encode(List<ColumnInfo> columnInfos) {
+ Preconditions.checkNotNull(columnInfos);
+ return Joiner.on(COLUMN_INFO_DELIMITER)
+ .skipNulls()
+ .join(columnInfos);
+ }
+
+ public static List<ColumnInfo> decode(final String columnInfoStr) {
+ Preconditions.checkNotNull(columnInfoStr);
+ List<ColumnInfo> columnInfos = Lists.newArrayList(
+ Iterables.transform(
+ Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+ new Function<String, ColumnInfo>() {
+ @Override
+ public ColumnInfo apply(String colInfo) {
+ return ColumnInfo.fromString(colInfo);
+ }
+ }));
+ return columnInfos;
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
new file mode 100644
index 0000000..0864cba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class to return a {@link Connection} .
+ */
+public class ConnectionUtil {
+
+ /**
+ * Returns the {@link Connection} from Configuration
+ * @param configuration
+ * @return
+ * @throws SQLException
+ */
+ public static Connection getConnection(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final Properties props = new Properties();
+ final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
+ return conn;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
new file mode 100644
index 0000000..83a606b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * A utility class to set properties on the {#link Configuration} instance.
+ * Used as part of Map Reduce job configuration.
+ *
+ */
+public final class PhoenixConfigurationUtil {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+
+ public static final String UPSERT_COLUMNS = "phoenix.upsert.columns";
+
+ public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+
+ public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
+
+ public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+
+ public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+
+ public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+
+ public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
+
+ public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
+
+ public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+
+ public static final String INPUT_TABLE_NAME = "phoenix.input.table.name" ;
+
+ public static final String INPUT_TABLE_CONDITIONS = "phoenix.input.table.conditions" ;
+
+ public static final String OUTPUT_TABLE_NAME = "phoenix.output.table.name" ;
+
+ public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+
+ public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
+
+ public static final String INPUT_CLASS = "phoenix.input.class";
+
+ public enum SchemaType {
+ TABLE,
+ QUERY;
+ }
+
+ private PhoenixConfigurationUtil(){
+
+ }
+ /**
+ *
+ * @param tableName
+ */
+ public static void setInputTableName(final Configuration configuration, final String tableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ configuration.set(INPUT_TABLE_NAME, tableName);
+ }
+
+ public static void setInputTableConditions(final Configuration configuration, final String conditions) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(conditions);
+ configuration.set(INPUT_TABLE_CONDITIONS, conditions);
+ }
+
+ public static void setSelectColumnNames(final Configuration configuration,final String[] columns) {
+ Preconditions.checkNotNull(configuration);
+ final String selectColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+ configuration.set(SELECT_COLUMNS, selectColumnNames);
+ }
+
+ public static void setSelectColumnNames(final Configuration configuration,final String columns) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(SELECT_COLUMNS, columns);
+ }
+
+ public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
+ }
+
+ public static void setInputQuery(final Configuration configuration, final String inputQuery) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(inputQuery);
+ configuration.set(SELECT_STATEMENT, inputQuery);
+ }
+
+ public static void setSchemaType(Configuration configuration, final SchemaType schemaType) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(SCHEMA_TYPE, schemaType.name());
+ }
+
+ public static void setOutputTableName(final Configuration configuration, final String tableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ configuration.set(OUTPUT_TABLE_NAME, tableName);
+ }
+
+ public static void setUpsertColumnNames(final Configuration configuration,final String[] columns) {
+ Preconditions.checkNotNull(configuration);
+ final String upsertColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+ configuration.set(UPSERT_COLUMNS, upsertColumnNames);
+ }
+
+ public static void setUpsertColumnNames(final Configuration configuration,final String columns) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(UPSERT_COLUMNS, columns);
+ }
+
+
+ public static void setBatchSize(final Configuration configuration, final Long batchSize) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+ }
+
+ public static Class<?> getInputClass(final Configuration configuration) {
+ return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
+ }
+ public static SchemaType getSchemaType(final Configuration configuration) {
+ final String schemaTp = configuration.get(SCHEMA_TYPE);
+ Preconditions.checkNotNull(schemaTp);
+ return SchemaType.valueOf(schemaTp);
+ }
+
+ public static List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String tableName = getOutputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ String upsertColumns = configuration.get(UPSERT_COLUMNS);
+ List<String> upsertColumnList = null;
+ if(isNotEmpty(upsertColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+ LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+ ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+ ));
+ }
+ List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+ connection.close();
+ return columnMetadataList;
+ }
+
+ public static String getUpsertStatement(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String tableName = getOutputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ String upsertStmt = configuration.get(UPSERT_STATEMENT);
+ if(isNotEmpty(upsertStmt)) {
+ return upsertStmt;
+ }
+ final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+ final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration);
+ if (useUpsertColumns) {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+ LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+ } else {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+ LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+ }
+ configuration.set(UPSERT_STATEMENT, upsertStmt);
+ return upsertStmt;
+
+ }
+
+ public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final String tableName = getInputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ final List<String> selectColumnList = getSelectColumnList(configuration);
+ final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+ connection.close();
+ return columnMetadataList;
+ }
+
+ private static List<String> getSelectColumnList(
+ final Configuration configuration) {
+ String selectColumns = configuration.get(SELECT_COLUMNS);
+ List<String> selectColumnList = null;
+ if(isNotEmpty(selectColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+ LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+ ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+ ));
+ }
+ return selectColumnList;
+ }
+
+ public static String getSelectStatement(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ String selectStmt = configuration.get(SELECT_STATEMENT);
+ if(isNotEmpty(selectStmt)) {
+ return selectStmt;
+ }
+ final String tableName = getInputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration);
+ final String conditions = configuration.get(INPUT_TABLE_CONDITIONS);
+ selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList, conditions);
+ LOG.info("Select Statement: "+ selectStmt);
+ configuration.set(SELECT_STATEMENT, selectStmt);
+ return selectStmt;
+ }
+
+ public static long getBatchSize(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+ if(batchSize <= 0) {
+ Connection conn = ConnectionUtil.getConnection(configuration);
+ batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+ conn.close();
+ }
+ configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+ return batchSize;
+ }
+
+ public static int getSelectColumnsCount(Configuration configuration,
+ String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String schemaTp = configuration.get(SCHEMA_TYPE);
+ final SchemaType schemaType = SchemaType.valueOf(schemaTp);
+ int count = 0;
+ if(SchemaType.QUERY.equals(schemaType)) {
+ List<String> selectedColumnList = getSelectColumnList(configuration);
+ count = selectedColumnList == null ? 0 : selectedColumnList.size();
+ } else {
+ List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration);
+ count = columnInfos == null ? 0 : columnInfos.size();
+ }
+ return count;
+ }
+
+ public static String getInputTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INPUT_TABLE_NAME);
+ }
+
+ public static String getOutputTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(OUTPUT_TABLE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
new file mode 100644
index 0000000..f1a7f5a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+
+/**
+ * Utility class for setting Configuration parameters for the Map Reduce job
+ */
+public final class PhoenixMapReduceUtil {
+
+ private PhoenixMapReduceUtil() {
+
+ }
+
+ /**
+ *
+ * @param job
+ * @param inputClass DBWritable class
+ * @param tableName Input table name
+ * @param conditions Condition clause to be added to the WHERE clause.
+ * @param fieldNames fields being projected for the SELECT query.
+ */
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
+ job.setInputFormatClass(PhoenixInputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
+ }
+
+ /**
+ *
+ * @param job
+ * @param inputClass DBWritable class
+ * @param tableName Input table name
+ * @param inputQuery Select query.
+ */
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
+ job.setInputFormatClass(PhoenixInputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ }
+
+ /**
+ *
+ * @param job
+ * @param outputClass
+ * @param tableName Output table
+ * @param columns List of columns separated by ,
+ */
+ public static void setOutput(final Job job, final String tableName,final String columns) {
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns);
+ }
+
+
+ /**
+ *
+ * @param job
+ * @param outputClass
+ * @param tableName Output table
+ * @param fieldNames fields
+ */
+ public static void setOutput(final Job job, final String tableName , final String... fieldNames) {
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 9f76e5a..0e2bae3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -325,8 +325,8 @@ public class PhoenixRuntime {
if (columns == null) {
// use all columns in the table
for(PColumn pColumn : table.getColumns()) {
- int sqlType = pColumn.getDataType().getResultSetSqlType();
- columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
+ int sqlType = pColumn.getDataType().getSqlType();
+ columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
}
} else {
// Leave "null" as indication to skip b/c it doesn't exist
@@ -405,7 +405,7 @@ public class PhoenixRuntime {
if (pColumn==null) {
throw new SQLException("pColumn must not be null.");
}
- int sqlType = pColumn.getDataType().getResultSetSqlType();
+ int sqlType = pColumn.getDataType().getSqlType();
ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
return columnInfo;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 8739c6d..88b68b0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -144,29 +144,33 @@ public final class QueryUtil {
*
* @param fullTableName name of the table for which the select statement needs to be created.
* @param columnInfos list of columns to be projected in the select statement.
+ * @param conditions The condition clause to be added to the WHERE condition
* @return Select Query
*/
- public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) {
+ public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
if(columnInfos == null || columnInfos.isEmpty()) {
throw new IllegalArgumentException("At least one column must be provided");
}
// escape the table name to ensure it is case sensitive.
final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName);
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT ");
for (ColumnInfo cinfo : columnInfos) {
if (cinfo != null) {
String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
- sb.append(fullColumnName);
- sb.append(",");
+ query.append(fullColumnName);
+ query.append(",");
}
}
// Remove the trailing comma
- sb.setLength(sb.length() - 1);
- sb.append(" FROM ");
- sb.append(escapedFullTableName);
- return sb.toString();
+ query.setLength(query.length() - 1);
+ query.append(" FROM ");
+ query.append(escapedFullTableName);
+ if(conditions != null && conditions.length() > 0) {
+ query.append(" WHERE (").append(conditions).append(")");
+ }
+ return query.toString();
}
public static String getUrl(String server) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..1004981
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+ @Test
+ public void testEncode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testDecode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testEncodeDecodeWithNulls() {
+ final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final ColumnInfo columnInfo2 = null;
+ final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+ final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ assertEquals(1,decodedColumnInfo.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
new file mode 100644
index 0000000..33c7531
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+/**
+ * Test for {@link PhoenixConfigurationUtil}
+ */
+public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
+
+ @Test
+ public void testUpsertStatement() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
+ assertEquals(expectedUpsertStatement, upserStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatement() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatementForSpecificColumns() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration, "A_BINARY");
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatementForArrayTypes() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration,"ID,VCARRAY");
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 182eb56..33e3b5a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -64,7 +64,7 @@ public class QueryUtilTest {
public void testConstructSelectStatement() {
assertEquals(
"SELECT \"ID\",\"NAME\" FROM \"MYTAB\"",
- QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN)));
+ QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
deleted file mode 100644
index efbfbf8..0000000
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-
-
-public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT {
- private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-
- @Test
- public void testUpsertStatement() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- final String upserStatement = configuration.getUpsertStatement();
- final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
- assertEquals(expectedUpsertStatement, upserStatement);
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectStatement() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- final String selectStatement = configuration.getSelectStatement();
- final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
- assertEquals(expectedSelectStatement, selectStatement);
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectStatementForSpecificColumns() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- configuration.setSelectColumns("A_BINARY");
- final String selectStatement = configuration.getSelectStatement();
- final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
- assertEquals(expectedSelectStatement, selectStatement);
- } finally {
- conn.close();
- }
- }
-
- private PhoenixPigConfiguration newConfiguration(String tableName) {
- final Configuration configuration = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100);
- return phoenixConfiguration;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
index d8bedf6..1218e82 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -28,18 +28,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
-import org.apache.phoenix.pig.hadoop.PhoenixInputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
import org.apache.phoenix.pig.util.TableSchemaParserFunction;
import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
@@ -83,12 +86,12 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
private static final String PHOENIX_QUERY_SCHEME = "hbase://query/";
private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema";
- private PhoenixPigConfiguration config;
+ private Configuration config;
private String tableName;
private String selectQuery;
private String zkQuorum ;
- private PhoenixInputFormat inputFormat;
- private RecordReader<NullWritable, PhoenixRecord> reader;
+ private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat;
+ private RecordReader<NullWritable,PhoenixPigDBWritable> reader;
private String contextSignature;
private ResourceSchema schema;
@@ -107,6 +110,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
final Configuration configuration = job.getConfiguration();
//explicitly turning off combining splits.
configuration.setBoolean("pig.noSplitCombination", true);
+ //to have phoenix working on a secured cluster
+ TableMapReduceUtil.initCredentials(job);
this.initializePhoenixPigConfiguration(location, configuration);
}
@@ -120,21 +125,22 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
if(this.config != null) {
return;
}
- this.config = new PhoenixPigConfiguration(configuration);
- this.config.setServerName(this.zkQuorum);
+ this.config = configuration;
+ this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum);
+ PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class);
Pair<String,String> pair = null;
try {
if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length());
final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction();
pair = parseFunction.apply(tableSchema);
- this.config.setSchemaType(SchemaType.TABLE);
+ PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.TABLE);
} else if (location.startsWith(PHOENIX_QUERY_SCHEME)) {
this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length());
final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config);
pair = queryParseFunction.apply(this.selectQuery);
- config.setSelectStatement(this.selectQuery);
- this.config.setSchemaType(SchemaType.QUERY);
+ PhoenixConfigurationUtil.setInputQuery(this.config, this.selectQuery);
+ PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.QUERY);
}
this.tableName = pair.getFirst();
final String selectedColumns = pair.getSecond();
@@ -142,9 +148,9 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) {
printUsage(location);
}
- this.config.setTableName(this.tableName);
+ PhoenixConfigurationUtil.setInputTableName(this.config, this.tableName);
if(!isEmpty(selectedColumns)) {
- this.config.setSelectColumns(selectedColumns);
+ PhoenixConfigurationUtil.setSelectColumnNames(this.config, selectedColumns);
}
} catch(IllegalArgumentException iae) {
printUsage(location);
@@ -160,7 +166,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
@Override
public InputFormat getInputFormat() throws IOException {
if(inputFormat == null) {
- inputFormat = new PhoenixInputFormat();
+ inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>();
+ PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class);
}
return inputFormat;
}
@@ -188,13 +195,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
public Tuple getNext() throws IOException {
try {
if(!reader.nextKeyValue()) {
- return null;
- }
- final PhoenixRecord phoenixRecord = reader.getCurrentValue();
- if(phoenixRecord == null) {
+ return null;
+ }
+ final PhoenixPigDBWritable record = reader.getCurrentValue();
+ if(record == null) {
return null;
}
- final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields());
+ final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields());
return tuple;
} catch (InterruptedException e) {
int errCode = 6018;