You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2014/12/09 03:03:12 UTC
[1/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables
Repository: phoenix
Updated Branches:
refs/heads/4.0 0de7863bc -> b70f3abd2
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
index 3daf4e1..27fd879 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/QuerySchemaParserFunctionTest.java
@@ -22,15 +22,12 @@ package org.apache.phoenix.pig.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -44,16 +41,14 @@ import com.google.common.base.Joiner;
*/
public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
- private PhoenixPigConfiguration phoenixConfiguration;
- private Connection conn;
+ private Configuration configuration;
private QuerySchemaParserFunction function;
@Before
public void setUp() throws SQLException {
- phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
- conn = DriverManager.getConnection(getUrl());
- Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
- function = new QuerySchemaParserFunction(phoenixConfiguration);
+ configuration = Mockito.mock(Configuration.class);
+ Mockito.when(configuration.get(HConstants.ZOOKEEPER_QUORUM)).thenReturn(getUrl());
+ function = new QuerySchemaParserFunction(configuration);
}
@Test(expected=RuntimeException.class)
@@ -101,9 +96,4 @@ public class QuerySchemaParserFunctionTest extends BaseConnectionlessQueryTest {
function.apply(selectQuery);
fail(" Function call successful despite passing an aggreagate query");
}
-
- @After
- public void tearDown() throws SQLException {
- conn.close();
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
index 1bd16e3..667d814 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunctionTest.java
@@ -19,16 +19,14 @@
*/
package org.apache.phoenix.pig.util;
-import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
import org.apache.phoenix.util.ColumnInfo;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -38,16 +36,14 @@ import com.google.common.collect.ImmutableList;
public class SqlQueryToColumnInfoFunctionTest extends BaseConnectionlessQueryTest {
- private PhoenixPigConfiguration phoenixConfiguration;
- private Connection conn;
+ private Configuration configuration;
private SqlQueryToColumnInfoFunction function;
@Before
public void setUp() throws SQLException {
- phoenixConfiguration = Mockito.mock(PhoenixPigConfiguration.class);
- conn = DriverManager.getConnection(getUrl());
- Mockito.when(phoenixConfiguration.getConnection()).thenReturn(conn);
- function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
+ configuration = Mockito.mock(Configuration.class);
+ Mockito.when(configuration.get(HConstants.ZOOKEEPER_QUORUM)).thenReturn(getUrl());
+ function = new SqlQueryToColumnInfoFunction(configuration);
}
@Test
@@ -66,10 +62,4 @@ public class SqlQueryToColumnInfoFunctionTest extends BaseConnectionlessQueryTe
Assert.assertEquals(expectedColumnInfos, actualColumnInfos);
}
-
- @After
- public void tearDown() throws SQLException {
- conn.close();
- }
-
}
[3/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables
Posted by ra...@apache.org.
PHOENIX-1454 Map Reduce over Phoenix tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b70f3abd
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b70f3abd
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b70f3abd
Branch: refs/heads/4.0
Commit: b70f3abd2f7e0d4393a5235a46de73f445ea51d1
Parents: 0de7863
Author: ravimagham <ra...@apache.org>
Authored: Mon Dec 8 18:02:57 2014 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Mon Dec 8 18:02:57 2014 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/PhoenixInputFormat.java | 117 +++++++
.../phoenix/mapreduce/PhoenixInputSplit.java | 129 +++++++
.../mapreduce/PhoenixOutputCommitter.java | 54 +++
.../phoenix/mapreduce/PhoenixOutputFormat.java | 62 ++++
.../phoenix/mapreduce/PhoenixRecordReader.java | 140 ++++++++
.../phoenix/mapreduce/PhoenixRecordWriter.java | 91 +++++
.../util/ColumnInfoToStringEncoderDecoder.java | 65 ++++
.../phoenix/mapreduce/util/ConnectionUtil.java | 49 +++
.../util/PhoenixConfigurationUtil.java | 299 ++++++++++++++++
.../mapreduce/util/PhoenixMapReduceUtil.java | 99 ++++++
.../org/apache/phoenix/util/PhoenixRuntime.java | 6 +-
.../java/org/apache/phoenix/util/QueryUtil.java | 22 +-
.../ColumnInfoToStringEncoderDecoderTest.java | 61 ++++
.../util/PhoenixConfigurationUtilTest.java | 124 +++++++
.../org/apache/phoenix/util/QueryUtilTest.java | 2 +-
.../phoenix/pig/PhoenixPigConfigurationIT.java | 109 ------
.../apache/phoenix/pig/PhoenixHBaseLoader.java | 45 +--
.../apache/phoenix/pig/PhoenixHBaseStorage.java | 218 ++++++------
.../phoenix/pig/PhoenixPigConfiguration.java | 340 -------------------
.../phoenix/pig/hadoop/PhoenixInputFormat.java | 142 --------
.../phoenix/pig/hadoop/PhoenixInputSplit.java | 134 --------
.../pig/hadoop/PhoenixOutputCommitter.java | 111 ------
.../phoenix/pig/hadoop/PhoenixOutputFormat.java | 94 -----
.../phoenix/pig/hadoop/PhoenixRecord.java | 112 ------
.../phoenix/pig/hadoop/PhoenixRecordReader.java | 142 --------
.../phoenix/pig/hadoop/PhoenixRecordWriter.java | 83 -----
.../util/ColumnInfoToStringEncoderDecoder.java | 69 ----
.../phoenix/pig/util/PhoenixPigSchemaUtil.java | 27 +-
.../pig/util/QuerySchemaParserFunction.java | 21 +-
.../pig/util/SqlQueryToColumnInfoFunction.java | 51 ++-
.../org/apache/phoenix/pig/util/TypeUtil.java | 4 +-
.../pig/writable/PhoenixPigDBWritable.java | 121 +++++++
.../pig/PhoenixPigConfigurationTest.java | 49 ---
.../ColumnInfoToStringEncoderDecoderTest.java | 61 ----
.../pig/util/PhoenixPigSchemaUtilTest.java | 17 +-
.../pig/util/QuerySchemaParserFunctionTest.java | 22 +-
.../util/SqlQueryToColumnInfoFunctionTest.java | 22 +-
37 files changed, 1642 insertions(+), 1672 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
new file mode 100644
index 0000000..7c67c2c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link InputFormat} implementation from Phoenix.
+ *
+ */
+public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable,T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+
+ /**
+ * instantiated by framework
+ */
+ public PhoenixInputFormat() {
+ }
+
+ @Override
+ public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+
+ final Configuration configuration = context.getConfiguration();
+ final QueryPlan queryPlan = getQueryPlan(context,configuration);
+ @SuppressWarnings("unchecked")
+ final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration);
+ return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan);
+ }
+
+
+
+ @Override
+ public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ final Configuration configuration = context.getConfiguration();
+ final QueryPlan queryPlan = getQueryPlan(context,configuration);
+ final List<KeyRange> allSplits = queryPlan.getSplits();
+ final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+ return splits;
+ }
+
+ private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
+ Preconditions.checkNotNull(qplan);
+ Preconditions.checkNotNull(splits);
+ final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
+ for (List<Scan> scans : qplan.getScans()) {
+ psplits.add(new PhoenixInputSplit(scans));
+ }
+ return psplits;
+ }
+
+ /**
+ * Returns the query plan associated with the select query.
+ * @param context
+ * @return
+ * @throws IOException
+ * @throws SQLException
+ */
+ private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
+ Preconditions.checkNotNull(context);
+ try{
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+ final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use secondary indexes
+ final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator();
+ return queryPlan;
+ } catch(Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
+ throw new RuntimeException(exception);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
new file mode 100644
index 0000000..b222fc9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.phoenix.query.KeyRange;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * Input split class to hold the lower and upper bound range. {@link KeyRange}
+ */
+public class PhoenixInputSplit extends InputSplit implements Writable {
+
+ private List<Scan> scans;
+ private KeyRange keyRange;
+
+ /**
+ * No Arg constructor
+ */
+ public PhoenixInputSplit() {
+ }
+
+ /**
+ *
+ * @param keyRange
+ */
+ public PhoenixInputSplit(final List<Scan> scans) {
+ Preconditions.checkNotNull(scans);
+ Preconditions.checkState(!scans.isEmpty());
+ this.scans = scans;
+ init();
+ }
+
+ public List<Scan> getScans() {
+ return scans;
+ }
+
+ public KeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ private void init() {
+ this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ int count = WritableUtils.readVInt(input);
+ scans = Lists.newArrayListWithExpectedSize(count);
+ for (int i = 0; i < count; i++) {
+ byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
+ input.readFully(protoScanBytes);
+ ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
+ Scan scan = ProtobufUtil.toScan(protoScan);
+ scans.add(scan);
+ }
+ init();
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ Preconditions.checkNotNull(scans);
+ WritableUtils.writeVInt(output, scans.size());
+ for (Scan scan : scans) {
+ ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
+ byte[] protoScanBytes = protoScan.toByteArray();
+ WritableUtils.writeVInt(output, protoScanBytes.length);
+ output.write(protoScanBytes);
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + keyRange.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) { return true; }
+ if (obj == null) { return false; }
+ if (!(obj instanceof PhoenixInputSplit)) { return false; }
+ PhoenixInputSplit other = (PhoenixInputSplit)obj;
+ if (keyRange == null) {
+ if (other.keyRange != null) { return false; }
+ } else if (!keyRange.equals(other.keyRange)) { return false; }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
new file mode 100644
index 0000000..ffee5c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A no-op {@link OutputCommitter}
+ */
+public class PhoenixOutputCommitter extends OutputCommitter {
+
+ public PhoenixOutputCommitter() {}
+
+ @Override
+ public void abortTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ }
+
+ @Override
+ public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void setupJob(JobContext jobContext) throws IOException {
+ }
+
+ @Override
+ public void setupTask(TaskAttemptContext context) throws IOException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
new file mode 100644
index 0000000..e55b977
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+
+/**
+ * {@link OutputFormat} implementation for Phoenix.
+ *
+ */
+public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> {
+ private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
+
+ @Override
+ public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
+ }
+
+ /**
+ *
+ */
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+ return new PhoenixOutputCommitter();
+ }
+
+ @Override
+ public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ return new PhoenixRecordWriter<T>(context.getConfiguration());
+ } catch (SQLException e) {
+ LOG.error("Error calling PhoenixRecordWriter " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
new file mode 100644
index 0000000..2c206ab
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.ConcatResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+
+/**
+ * {@link RecordReader} implementation that iterates over the the records.
+ */
+public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
+ private final Configuration configuration;
+ private final QueryPlan queryPlan;
+ private NullWritable key = NullWritable.get();
+ private T value = null;
+ private Class<T> inputClass;
+ private ResultIterator resultIterator = null;
+ private PhoenixResultSet resultSet;
+
+ public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(queryPlan);
+ this.inputClass = inputClass;
+ this.configuration = configuration;
+ this.queryPlan = queryPlan;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(resultIterator != null) {
+ try {
+ resultIterator.close();
+ } catch (SQLException e) {
+ LOG.error(" Error closing resultset.");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public T getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
+ final List<Scan> scans = pSplit.getScans();
+ try {
+ List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
+ for (Scan scan : scans) {
+ final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
+ PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+ iterators.add(peekingResultIterator);
+ }
+ ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
+ if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
+ iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
+ }
+ this.resultIterator = iterator;
+ this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
+ Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (key == null) {
+ key = NullWritable.get();
+ }
+ if (value == null) {
+ value = ReflectionUtils.newInstance(inputClass, this.configuration);
+ }
+ Preconditions.checkNotNull(this.resultSet);
+ try {
+ if(!resultSet.next()) {
+ return false;
+ }
+ value.readFields(resultSet);
+ return true;
+ } catch (SQLException e) {
+ LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
new file mode 100644
index 0000000..4d26bf4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+
+/**
+ * Default {@link RecordWriter} implementation from Phoenix
+ *
+ */
+public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<NullWritable, T> {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
+
+ private final Connection conn;
+ private final PreparedStatement statement;
+ private final long batchSize;
+ private long numRecords = 0;
+
+ public PhoenixRecordWriter(final Configuration configuration) throws SQLException {
+ this.conn = ConnectionUtil.getConnection(configuration);
+ this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
+ final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ this.statement = this.conn.prepareStatement(upsertQuery);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ statement.executeBatch();
+ conn.commit();
+ } catch (SQLException e) {
+ LOG.error("SQLException while performing the commit for the task.");
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ statement.close();
+ conn.close();
+ }
+ catch (SQLException ex) {
+ LOG.error("SQLException while closing the connection for the task.");
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ @Override
+ public void write(NullWritable n, T record) throws IOException, InterruptedException {
+ try {
+ record.write(statement);
+ numRecords++;
+ statement.addBatch();
+ if (numRecords % batchSize == 0) {
+ LOG.debug("commit called on a batch of size : " + batchSize);
+ statement.executeBatch();
+ conn.commit();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Exception while committing to database.", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
new file mode 100644
index 0000000..ec52fba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.util.List;
+
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
+ */
+public class ColumnInfoToStringEncoderDecoder {
+
+ private static final String COLUMN_INFO_DELIMITER = "|";
+
+ private ColumnInfoToStringEncoderDecoder() {
+
+ }
+
+ public static String encode(List<ColumnInfo> columnInfos) {
+ Preconditions.checkNotNull(columnInfos);
+ return Joiner.on(COLUMN_INFO_DELIMITER)
+ .skipNulls()
+ .join(columnInfos);
+ }
+
+ public static List<ColumnInfo> decode(final String columnInfoStr) {
+ Preconditions.checkNotNull(columnInfoStr);
+ List<ColumnInfo> columnInfos = Lists.newArrayList(
+ Iterables.transform(
+ Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
+ new Function<String, ColumnInfo>() {
+ @Override
+ public ColumnInfo apply(String colInfo) {
+ return ColumnInfo.fromString(colInfo);
+ }
+ }));
+ return columnInfos;
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
new file mode 100644
index 0000000..0864cba
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Utility class to return a {@link Connection} .
+ */
+public class ConnectionUtil {
+
+ /**
+ * Returns the {@link Connection} from Configuration
+ * @param configuration
+ * @return
+ * @throws SQLException
+ */
+ public static Connection getConnection(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final Properties props = new Properties();
+ final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
+ return conn;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
new file mode 100644
index 0000000..83a606b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+/**
+ * A utility class to set properties on the {#link Configuration} instance.
+ * Used as part of Map Reduce job configuration.
+ *
+ */
+public final class PhoenixConfigurationUtil {
+
+ private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
+
+ public static final String UPSERT_COLUMNS = "phoenix.upsert.columns";
+
+ public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
+
+ public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
+
+ public static final String SELECT_STATEMENT = "phoenix.select.stmt";
+
+ public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
+
+ public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
+
+ public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
+
+ public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
+
+ public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
+
+ public static final String INPUT_TABLE_NAME = "phoenix.input.table.name" ;
+
+ public static final String INPUT_TABLE_CONDITIONS = "phoenix.input.table.conditions" ;
+
+ public static final String OUTPUT_TABLE_NAME = "phoenix.output.table.name" ;
+
+ public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
+
+ public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
+
+ public static final String INPUT_CLASS = "phoenix.input.class";
+
+ public enum SchemaType {
+ TABLE,
+ QUERY;
+ }
+
+ private PhoenixConfigurationUtil(){
+
+ }
+ /**
+ *
+ * @param tableName
+ */
+ public static void setInputTableName(final Configuration configuration, final String tableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ configuration.set(INPUT_TABLE_NAME, tableName);
+ }
+
+ public static void setInputTableConditions(final Configuration configuration, final String conditions) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(conditions);
+ configuration.set(INPUT_TABLE_CONDITIONS, conditions);
+ }
+
+ public static void setSelectColumnNames(final Configuration configuration,final String[] columns) {
+ Preconditions.checkNotNull(configuration);
+ final String selectColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+ configuration.set(SELECT_COLUMNS, selectColumnNames);
+ }
+
+ public static void setSelectColumnNames(final Configuration configuration,final String columns) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(SELECT_COLUMNS, columns);
+ }
+
+ public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class);
+ }
+
+ public static void setInputQuery(final Configuration configuration, final String inputQuery) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(inputQuery);
+ configuration.set(SELECT_STATEMENT, inputQuery);
+ }
+
+ public static void setSchemaType(Configuration configuration, final SchemaType schemaType) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(SCHEMA_TYPE, schemaType.name());
+ }
+
+ public static void setOutputTableName(final Configuration configuration, final String tableName) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(tableName);
+ configuration.set(OUTPUT_TABLE_NAME, tableName);
+ }
+
+ public static void setUpsertColumnNames(final Configuration configuration,final String[] columns) {
+ Preconditions.checkNotNull(configuration);
+ final String upsertColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns);
+ configuration.set(UPSERT_COLUMNS, upsertColumnNames);
+ }
+
+ public static void setUpsertColumnNames(final Configuration configuration,final String columns) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(UPSERT_COLUMNS, columns);
+ }
+
+
+ public static void setBatchSize(final Configuration configuration, final Long batchSize) {
+ Preconditions.checkNotNull(configuration);
+ configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+ }
+
+ public static Class<?> getInputClass(final Configuration configuration) {
+ return configuration.getClass(INPUT_CLASS, NullDBWritable.class);
+ }
+ public static SchemaType getSchemaType(final Configuration configuration) {
+ final String schemaTp = configuration.get(SCHEMA_TYPE);
+ Preconditions.checkNotNull(schemaTp);
+ return SchemaType.valueOf(schemaTp);
+ }
+
+ public static List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String tableName = getOutputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ String upsertColumns = configuration.get(UPSERT_COLUMNS);
+ List<String> upsertColumnList = null;
+ if(isNotEmpty(upsertColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
+ LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
+ ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
+ ));
+ }
+ List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+ connection.close();
+ return columnMetadataList;
+ }
+
+ public static String getUpsertStatement(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String tableName = getOutputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ String upsertStmt = configuration.get(UPSERT_STATEMENT);
+ if(isNotEmpty(upsertStmt)) {
+ return upsertStmt;
+ }
+ final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
+ final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration);
+ if (useUpsertColumns) {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
+ LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
+ } else {
+ // Generating UPSERT statement without column name information.
+ upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
+ LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
+ }
+ configuration.set(UPSERT_STATEMENT, upsertStmt);
+ return upsertStmt;
+
+ }
+
+ public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
+ if(isNotEmpty(columnInfoStr)) {
+ return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ }
+ final String tableName = getInputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final Connection connection = ConnectionUtil.getConnection(configuration);
+ final List<String> selectColumnList = getSelectColumnList(configuration);
+ final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+ // we put the encoded column infos in the Configuration for re usability.
+ configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
+ connection.close();
+ return columnMetadataList;
+ }
+
+ private static List<String> getSelectColumnList(
+ final Configuration configuration) {
+ String selectColumns = configuration.get(SELECT_COLUMNS);
+ List<String> selectColumnList = null;
+ if(isNotEmpty(selectColumns)) {
+ final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
+ selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
+ LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
+ ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
+ ));
+ }
+ return selectColumnList;
+ }
+
+ public static String getSelectStatement(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ String selectStmt = configuration.get(SELECT_STATEMENT);
+ if(isNotEmpty(selectStmt)) {
+ return selectStmt;
+ }
+ final String tableName = getInputTableName(configuration);
+ Preconditions.checkNotNull(tableName);
+ final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration);
+ final String conditions = configuration.get(INPUT_TABLE_CONDITIONS);
+ selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList, conditions);
+ LOG.info("Select Statement: "+ selectStmt);
+ configuration.set(SELECT_STATEMENT, selectStmt);
+ return selectStmt;
+ }
+
+ public static long getBatchSize(final Configuration configuration) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
+ if(batchSize <= 0) {
+ Connection conn = ConnectionUtil.getConnection(configuration);
+ batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
+ conn.close();
+ }
+ configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
+ return batchSize;
+ }
+
+ public static int getSelectColumnsCount(Configuration configuration,
+ String tableName) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ final String schemaTp = configuration.get(SCHEMA_TYPE);
+ final SchemaType schemaType = SchemaType.valueOf(schemaTp);
+ int count = 0;
+ if(SchemaType.QUERY.equals(schemaType)) {
+ List<String> selectedColumnList = getSelectColumnList(configuration);
+ count = selectedColumnList == null ? 0 : selectedColumnList.size();
+ } else {
+ List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration);
+ count = columnInfos == null ? 0 : columnInfos.size();
+ }
+ return count;
+ }
+
+ public static String getInputTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INPUT_TABLE_NAME);
+ }
+
+ public static String getOutputTableName(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(OUTPUT_TABLE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
new file mode 100644
index 0000000..f1a7f5a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+
+/**
+ * Utility class for setting Configuration parameters for the Map Reduce job
+ */
+public final class PhoenixMapReduceUtil {
+
+ private PhoenixMapReduceUtil() {
+
+ }
+
+ /**
+ *
+ * @param job
+ * @param inputClass DBWritable class
+ * @param tableName Input table name
+ * @param conditions Condition clause to be added to the WHERE clause.
+ * @param fieldNames fields being projected for the SELECT query.
+ */
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) {
+ job.setInputFormatClass(PhoenixInputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE);
+ }
+
+ /**
+ *
+ * @param job
+ * @param inputClass DBWritable class
+ * @param tableName Input table name
+ * @param inputQuery Select query.
+ */
+ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) {
+ job.setInputFormatClass(PhoenixInputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+ PhoenixConfigurationUtil.setInputClass(configuration,inputClass);
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ }
+
+ /**
+ *
+ * @param job
+ * @param outputClass
+ * @param tableName Output table
+ * @param columns List of columns separated by ,
+ */
+ public static void setOutput(final Job job, final String tableName,final String columns) {
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns);
+ }
+
+
+ /**
+ *
+ * @param job
+ * @param outputClass
+ * @param tableName Output table
+ * @param fieldNames fields
+ */
+ public static void setOutput(final Job job, final String tableName , final String... fieldNames) {
+ job.setOutputFormatClass(PhoenixOutputFormat.class);
+ final Configuration configuration = job.getConfiguration();
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index f7abe7e..2dc4029 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -325,8 +325,8 @@ public class PhoenixRuntime {
if (columns == null) {
// use all columns in the table
for(PColumn pColumn : table.getColumns()) {
- int sqlType = pColumn.getDataType().getResultSetSqlType();
- columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
+ int sqlType = pColumn.getDataType().getSqlType();
+ columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType));
}
} else {
// Leave "null" as indication to skip b/c it doesn't exist
@@ -405,7 +405,7 @@ public class PhoenixRuntime {
if (pColumn==null) {
throw new SQLException("pColumn must not be null.");
}
- int sqlType = pColumn.getDataType().getResultSetSqlType();
+ int sqlType = pColumn.getDataType().getSqlType();
ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType);
return columnInfo;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index af77001..b91fddc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -145,29 +145,33 @@ public final class QueryUtil {
*
* @param fullTableName name of the table for which the select statement needs to be created.
* @param columnInfos list of columns to be projected in the select statement.
+ * @param conditions The condition clause to be added to the WHERE condition
* @return Select Query
*/
- public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) {
+ public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) {
Preconditions.checkNotNull(fullTableName,"Table name cannot be null");
if(columnInfos == null || columnInfos.isEmpty()) {
throw new IllegalArgumentException("At least one column must be provided");
}
// escape the table name to ensure it is case sensitive.
final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName);
- StringBuilder sb = new StringBuilder();
- sb.append("SELECT ");
+ StringBuilder query = new StringBuilder();
+ query.append("SELECT ");
for (ColumnInfo cinfo : columnInfos) {
if (cinfo != null) {
String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName());
- sb.append(fullColumnName);
- sb.append(",");
+ query.append(fullColumnName);
+ query.append(",");
}
}
// Remove the trailing comma
- sb.setLength(sb.length() - 1);
- sb.append(" FROM ");
- sb.append(escapedFullTableName);
- return sb.toString();
+ query.setLength(query.length() - 1);
+ query.append(" FROM ");
+ query.append(escapedFullTableName);
+ if(conditions != null && conditions.length() > 0) {
+ query.append(" WHERE (").append(conditions).append(")");
+ }
+ return query.toString();
}
public static String getUrl(String server) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
new file mode 100644
index 0000000..1004981
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
+ */
+public class ColumnInfoToStringEncoderDecoderTest {
+
+ @Test
+ public void testEncode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testDecode() {
+ final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
+ assertEquals(columnInfo.toString(),encodedColumnInfo);
+ }
+
+ @Test
+ public void testEncodeDecodeWithNulls() {
+ final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
+ final ColumnInfo columnInfo2 = null;
+ final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
+ final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
+ assertEquals(1,decodedColumnInfo.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
new file mode 100644
index 0000000..33c7531
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+/**
+ * Test for {@link PhoenixConfigurationUtil}
+ */
+public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
+
+ @Test
+ public void testUpsertStatement() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setOutputTableName(configuration, tableName);
+ final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+ final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
+ assertEquals(expectedUpsertStatement, upserStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatement() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatementForSpecificColumns() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
+ " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration, "A_BINARY");
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testSelectStatementForArrayTypes() throws Exception {
+ Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
+ final String tableName = "TEST_TABLE";
+ try {
+ String ddl = "CREATE TABLE "+ tableName +
+ " (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n";
+ conn.createStatement().execute(ddl);
+ final Configuration configuration = new Configuration ();
+ configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl());
+ PhoenixConfigurationUtil.setSelectColumnNames(configuration,"ID,VCARRAY");
+ PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+ PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+ final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
+ assertEquals(expectedSelectStatement, selectStatement);
+ } finally {
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 182eb56..33e3b5a 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -64,7 +64,7 @@ public class QueryUtilTest {
public void testConstructSelectStatement() {
assertEquals(
"SELECT \"ID\",\"NAME\" FROM \"MYTAB\"",
- QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN)));
+ QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null));
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
deleted file mode 100644
index efbfbf8..0000000
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Test;
-
-
-public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT {
- private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-
- @Test
- public void testUpsertStatement() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- final String upserStatement = configuration.getUpsertStatement();
- final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
- assertEquals(expectedUpsertStatement, upserStatement);
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectStatement() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- final String selectStatement = configuration.getSelectStatement();
- final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
- assertEquals(expectedSelectStatement, selectStatement);
- } finally {
- conn.close();
- }
- }
-
- @Test
- public void testSelectStatementForSpecificColumns() throws Exception {
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- final String tableName = "TEST_TABLE";
- try {
- String ddl = "CREATE TABLE "+ tableName +
- " (a_string varchar not null, a_binary varbinary not null, col1 integer" +
- " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n";
- createTestTable(getUrl(), ddl);
- final PhoenixPigConfiguration configuration = newConfiguration (tableName);
- configuration.setSelectColumns("A_BINARY");
- final String selectStatement = configuration.getSelectStatement();
- final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ;
- assertEquals(expectedSelectStatement, selectStatement);
- } finally {
- conn.close();
- }
- }
-
- private PhoenixPigConfiguration newConfiguration(String tableName) {
- final Configuration configuration = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100);
- return phoenixConfiguration;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
index d8bedf6..1218e82 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java
@@ -28,18 +28,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
-import org.apache.phoenix.pig.hadoop.PhoenixInputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
import org.apache.phoenix.pig.util.QuerySchemaParserFunction;
import org.apache.phoenix.pig.util.TableSchemaParserFunction;
import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
@@ -83,12 +86,12 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
private static final String PHOENIX_QUERY_SCHEME = "hbase://query/";
private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema";
- private PhoenixPigConfiguration config;
+ private Configuration config;
private String tableName;
private String selectQuery;
private String zkQuorum ;
- private PhoenixInputFormat inputFormat;
- private RecordReader<NullWritable, PhoenixRecord> reader;
+ private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat;
+ private RecordReader<NullWritable,PhoenixPigDBWritable> reader;
private String contextSignature;
private ResourceSchema schema;
@@ -107,6 +110,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
final Configuration configuration = job.getConfiguration();
//explicitly turning off combining splits.
configuration.setBoolean("pig.noSplitCombination", true);
+ //to have phoenix working on a secured cluster
+ TableMapReduceUtil.initCredentials(job);
this.initializePhoenixPigConfiguration(location, configuration);
}
@@ -120,21 +125,22 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
if(this.config != null) {
return;
}
- this.config = new PhoenixPigConfiguration(configuration);
- this.config.setServerName(this.zkQuorum);
+ this.config = configuration;
+ this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum);
+ PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class);
Pair<String,String> pair = null;
try {
if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) {
String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length());
final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction();
pair = parseFunction.apply(tableSchema);
- this.config.setSchemaType(SchemaType.TABLE);
+ PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.TABLE);
} else if (location.startsWith(PHOENIX_QUERY_SCHEME)) {
this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length());
final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config);
pair = queryParseFunction.apply(this.selectQuery);
- config.setSelectStatement(this.selectQuery);
- this.config.setSchemaType(SchemaType.QUERY);
+ PhoenixConfigurationUtil.setInputQuery(this.config, this.selectQuery);
+ PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.QUERY);
}
this.tableName = pair.getFirst();
final String selectedColumns = pair.getSecond();
@@ -142,9 +148,9 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) {
printUsage(location);
}
- this.config.setTableName(this.tableName);
+ PhoenixConfigurationUtil.setInputTableName(this.config, this.tableName);
if(!isEmpty(selectedColumns)) {
- this.config.setSelectColumns(selectedColumns);
+ PhoenixConfigurationUtil.setSelectColumnNames(this.config, selectedColumns);
}
} catch(IllegalArgumentException iae) {
printUsage(location);
@@ -160,7 +166,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
@Override
public InputFormat getInputFormat() throws IOException {
if(inputFormat == null) {
- inputFormat = new PhoenixInputFormat();
+ inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>();
+ PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class);
}
return inputFormat;
}
@@ -188,13 +195,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata {
public Tuple getNext() throws IOException {
try {
if(!reader.nextKeyValue()) {
- return null;
- }
- final PhoenixRecord phoenixRecord = reader.getCurrentValue();
- if(phoenixRecord == null) {
+ return null;
+ }
+ final PhoenixPigDBWritable record = reader.getCurrentValue();
+ if(record == null) {
return null;
}
- final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields());
+ final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields());
return tuple;
} catch (InterruptedException e) {
int errCode = 6018;
[2/3] phoenix git commit: PHOENIX-1454 Map Reduce over Phoenix tables
Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index 500e403..eb2c124 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.pig;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.List;
import java.util.Properties;
import org.apache.commons.cli.CommandLine;
@@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
+import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
@@ -75,74 +81,76 @@ import org.apache.pig.impl.util.UDFContext;
@SuppressWarnings("rawtypes")
public class PhoenixHBaseStorage implements StoreFuncInterface {
- private PhoenixPigConfiguration config;
- private RecordWriter<NullWritable, PhoenixRecord> writer;
- private String contextSignature = null;
- private ResourceSchema schema;
- private long batchSize;
- private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
-
- // Set of options permitted
- private final static Options validOptions = new Options();
- private final static CommandLineParser parser = new GnuParser();
- private final static String SCHEMA = "_schema";
-
- private final CommandLine configuredOptions;
- private final String server;
-
- public PhoenixHBaseStorage(String server) throws ParseException {
- this(server, null);
- }
-
- public PhoenixHBaseStorage(String server, String optString)
- throws ParseException {
- populateValidOptions();
- this.server = server;
-
- String[] optsArr = optString == null ? new String[0] : optString.split(" ");
- try {
- configuredOptions = parser.parse(validOptions, optsArr);
- } catch (ParseException e) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("[-batchSize]", validOptions);
- throw e;
- }
-
- batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
- }
-
- private static void populateValidOptions() {
- validOptions.addOption("batchSize", true, "Specify upsert batch size");
- }
-
- /**
- * Returns UDFProperties based on <code>contextSignature</code>.
- */
- private Properties getUDFProperties() {
- return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
- }
-
-
- /**
- * Parse the HBase table name and configure job
- */
- @Override
- public void setStoreLocation(String location, Job job) throws IOException {
- URI locationURI;
+ private Configuration config;
+ private RecordWriter<NullWritable, PhoenixPigDBWritable> writer;
+ private List<ColumnInfo> columnInfo = null;
+ private String contextSignature = null;
+ private ResourceSchema schema;
+ private long batchSize;
+ private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+
+ // Set of options permitted
+ private final static Options validOptions = new Options();
+ private final static CommandLineParser parser = new GnuParser();
+ private final static String SCHEMA = "_schema";
+
+ private final CommandLine configuredOptions;
+ private final String server;
+
+ public PhoenixHBaseStorage(String server) throws ParseException {
+ this(server, null);
+ }
+
+ public PhoenixHBaseStorage(String server, String optString)
+ throws ParseException {
+ populateValidOptions();
+ this.server = server;
+
+ String[] optsArr = optString == null ? new String[0] : optString.split(" ");
+ try {
+ configuredOptions = parser.parse(validOptions, optsArr);
+ } catch (ParseException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("[-batchSize]", validOptions);
+ throw e;
+ }
+ batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize"));
+ }
+
+ private static void populateValidOptions() {
+ validOptions.addOption("batchSize", true, "Specify upsert batch size");
+ }
+
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature });
+ }
+
+
+ /**
+ * Parse the HBase table name and configure job
+ */
+ @Override
+ public void setStoreLocation(String location, Job job) throws IOException {
+ URI locationURI;
try {
locationURI = new URI(location);
if (!"hbase".equals(locationURI.getScheme())) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location));
}
+ config = job.getConfiguration();
+ config.set(HConstants.ZOOKEEPER_QUORUM, server);
String tableName = locationURI.getAuthority();
// strip off the leading path token '/'
String columns = null;
if(!locationURI.getPath().isEmpty()) {
columns = locationURI.getPath().substring(1);
+ PhoenixConfigurationUtil.setUpsertColumnNames(config, columns);
}
- config = new PhoenixPigConfiguration(job.getConfiguration());
- config.configure(server, tableName, batchSize, columns);
-
+ PhoenixConfigurationUtil.setOutputTableName(config,tableName);
+ PhoenixConfigurationUtil.setBatchSize(config,batchSize);
String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
if (serializedSchema != null) {
schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
@@ -150,59 +158,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
} catch (URISyntaxException e) {
throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e);
}
- }
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
@Override
- public void prepareToWrite(RecordWriter writer) throws IOException {
- this.writer =writer;
- }
+ public void prepareToWrite(RecordWriter writer) throws IOException {
+ this.writer = writer;
+ try {
+ this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config);
+ } catch(SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
- @Override
- public void putNext(Tuple t) throws IOException {
+ @Override
+ public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();
-
- PhoenixRecord record = new PhoenixRecord(fieldSchemas);
-
+ PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo);
for(int i=0; i<t.size(); i++) {
- record.add(t.get(i));
+ record.add(t.get(i));
+ }
+ try {
+ this.writer.write(null, record);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
- try {
- writer.write(null, record);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- }
+ }
- @Override
- public void setStoreFuncUDFContextSignature(String signature) {
+ @Override
+ public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
- }
-
- @Override
- public void cleanupOnFailure(String location, Job job) throws IOException {
- }
-
- @Override
- public void cleanupOnSuccess(String location, Job job) throws IOException {
- }
-
- @Override
- public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
- return location;
- }
-
- @Override
- public OutputFormat getOutputFormat() throws IOException {
- return outputFormat;
- }
-
- @Override
- public void checkSchema(ResourceSchema s) throws IOException {
- schema = s;
- getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
- }
+ }
+
+ @Override
+ public void cleanupOnFailure(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public void cleanupOnSuccess(String location, Job job) throws IOException {
+ }
+
+ @Override
+ public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+ return location;
+ }
+
+ @Override
+ public OutputFormat getOutputFormat() throws IOException {
+ return outputFormat;
+ }
+
+ @Override
+ public void checkSchema(ResourceSchema s) throws IOException {
+ schema = s;
+ getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
deleted file mode 100644
index c6b6ec9..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig;
-
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-
-
-/**
- * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader}
- *
- */
-public class PhoenixPigConfiguration {
-
- private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class);
-
- private PhoenixPigConfigurationUtil util;
-
- /**
- * Speculative execution of Map tasks
- */
- public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution";
-
- /**
- * Speculative execution of Reduce tasks
- */
- public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution";
-
- public static final String SERVER_NAME = "phoenix.hbase.server.name";
-
- public static final String TABLE_NAME = "phoenix.hbase.table.name";
-
- public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns";
-
- public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt";
-
- public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list";
-
- public static final String SELECT_STATEMENT = "phoenix.select.stmt";
-
- public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size";
-
- //columns projected given as part of LOAD.
- public static final String SELECT_COLUMNS = "phoneix.select.query.columns";
-
- public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list";
-
- public static final String SCHEMA_TYPE = "phoenix.select.schema.type";
-
- // the delimiter supported during LOAD and STORE when projected columns are given.
- public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter";
-
- public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000;
-
- public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ",";
-
- private final Configuration conf;
-
- public PhoenixPigConfiguration(Configuration conf) {
- this.conf = conf;
- this.util = new PhoenixPigConfigurationUtil();
- }
-
- public void configure(String server, String tableName, long batchSize) {
- configure(server,tableName,batchSize,null);
- }
-
- public void configure(String server, String tableName, long batchSize, String columns) {
- conf.set(SERVER_NAME, server);
- conf.set(TABLE_NAME, tableName);
- conf.setLong(UPSERT_BATCH_SIZE, batchSize);
- if (isNotEmpty(columns)) {
- conf.set(UPSERT_COLUMNS, columns);
- }
- conf.setBoolean(MAP_SPECULATIVE_EXEC, false);
- conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false);
- }
-
-
- /**
- * Creates a {@link Connection} with autoCommit set to false.
- * @throws SQLException
- */
- public Connection getConnection() throws SQLException {
- return getUtil().getConnection(getConfiguration());
- }
-
- public String getUpsertStatement() throws SQLException {
- return getUtil().getUpsertStatement(getConfiguration(), getTableName());
- }
-
- public long getBatchSize() throws SQLException {
- return getUtil().getBatchSize(getConfiguration());
- }
-
- public String getServer() {
- return conf.get(SERVER_NAME);
- }
-
- public List<ColumnInfo> getColumnMetadataList() throws SQLException {
- return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public String getUpsertColumns() {
- return conf.get(UPSERT_COLUMNS);
- }
-
- public String getTableName() {
- return conf.get(TABLE_NAME);
- }
-
- public Configuration getConfiguration() {
- return this.conf;
- }
-
- public String getSelectStatement() throws SQLException {
- return getUtil().getSelectStatement(getConfiguration(), getTableName());
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException {
- return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName());
- }
-
- public int getSelectColumnsCount() throws SQLException {
- return getUtil().getSelectColumnsCount(getConfiguration(), getTableName());
- }
-
- public SchemaType getSchemaType() {
- final String schemaTp = conf.get(SCHEMA_TYPE);
- return SchemaType.valueOf(schemaTp);
- }
-
-
- public void setServerName(final String zookeeperQuorum) {
- this.conf.set(SERVER_NAME, zookeeperQuorum);
- }
-
- public void setTableName(final String tableName) {
- Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!");
- this.conf.set(TABLE_NAME, tableName);
- }
-
- public void setSelectStatement(final String selectStatement) {
- this.conf.set(SELECT_STATEMENT, selectStatement);
- }
-
- public void setSelectColumns(String selectColumns) {
- this.conf.set(SELECT_COLUMNS, selectColumns);
- }
-
- public PhoenixPigConfigurationUtil getUtil() {
- return this.util;
- }
-
- public void setSchemaType(final SchemaType schemaType) {
- this.conf.set(SCHEMA_TYPE, schemaType.name());
- }
-
- public enum SchemaType {
- TABLE,
- QUERY;
- }
-
-
- @VisibleForTesting
- static class PhoenixPigConfigurationUtil {
-
- public Connection getConnection(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Properties props = new Properties();
- final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class);
- conn.setAutoCommit(false);
- return conn;
- }
-
- public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- String upsertColumns = configuration.get(UPSERT_COLUMNS);
- List<String> upsertColumnList = null;
- if(isNotEmpty(upsertColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns));
- LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s "
- ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList)
- ));
- }
- List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String upsertStmt = configuration.get(UPSERT_STATEMENT);
- if(isNotEmpty(upsertStmt)) {
- return upsertStmt;
- }
- final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,""));
- final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName);
- if (useUpsertColumns) {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList);
- LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt);
- } else {
- // Generating UPSERT statement without column name information.
- upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size());
- LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt);
- }
- configuration.set(UPSERT_STATEMENT, upsertStmt);
- return upsertStmt;
-
- }
-
- public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY);
- if(isNotEmpty(columnInfoStr)) {
- return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- }
- final Connection connection = getConnection(configuration);
- final List<String> selectColumnList = getSelectColumnList(configuration);
- final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList);
- final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
- // we put the encoded column infos in the Configuration for re usability.
- configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos);
- closeConnection(connection);
- return columnMetadataList;
- }
-
- private List<String> getSelectColumnList(
- final Configuration configuration) {
- String selectColumns = configuration.get(SELECT_COLUMNS);
- List<String> selectColumnList = null;
- if(isNotEmpty(selectColumns)) {
- final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER);
- selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns));
- LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s "
- ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList)
- ));
- }
- return selectColumnList;
- }
-
- public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- Preconditions.checkNotNull(tableName);
- String selectStmt = configuration.get(SELECT_STATEMENT);
- if(isNotEmpty(selectStmt)) {
- return selectStmt;
- }
- final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName);
- selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList);
- LOG.info("Select Statement: "+ selectStmt);
- configuration.set(SELECT_STATEMENT, selectStmt);
- return selectStmt;
- }
-
- public long getBatchSize(final Configuration configuration) throws SQLException {
- Preconditions.checkNotNull(configuration);
- long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE);
- if(batchSize <= 0) {
- Connection conn = getConnection(configuration);
- batchSize = ((PhoenixConnection) conn).getMutateBatchSize();
- closeConnection(conn);
- }
- configuration.setLong(UPSERT_BATCH_SIZE, batchSize);
- return batchSize;
- }
-
- public int getSelectColumnsCount(Configuration configuration,
- String tableName) throws SQLException {
- Preconditions.checkNotNull(configuration);
- final String schemaTp = configuration.get(SCHEMA_TYPE);
- final SchemaType schemaType = SchemaType.valueOf(schemaTp);
- int count = 0;
- if(SchemaType.QUERY.equals(schemaType)) {
- List<String> selectedColumnList = getSelectColumnList(configuration);
- count = selectedColumnList == null ? 0 : selectedColumnList.size();
- } else {
- List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName);
- count = columnInfos == null ? 0 : columnInfos.size();
- }
- return count;
- }
-
- private void closeConnection(final Connection connection) throws SQLException {
- if(connection != null) {
- connection.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
deleted file mode 100644
index b58e7e3..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * The InputFormat class for generating the splits and creating the record readers.
- *
- */
-public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class);
- private PhoenixPigConfiguration phoenixConfiguration;
- private Connection connection;
- private QueryPlan queryPlan;
-
- /**
- * instantiated by framework
- */
- public PhoenixInputFormat() {
- }
-
- @Override
- public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context)
- throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- try {
- return new PhoenixRecordReader(phoenixConfiguration,queryPlan);
- }catch(SQLException sqle) {
- throw new IOException(sqle);
- }
- }
-
-
-
- @Override
- public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
- setConf(context.getConfiguration());
- final QueryPlan queryPlan = getQueryPlan(context);
- final List<KeyRange> allSplits = queryPlan.getSplits();
- final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
- return splits;
- }
-
- private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException {
- Preconditions.checkNotNull(qplan);
- Preconditions.checkNotNull(splits);
- final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size());
- for (List<Scan> scans : qplan.getScans()) {
- psplits.add(new PhoenixInputSplit(scans));
- }
- return psplits;
- }
-
- public void setConf(Configuration configuration) {
- this.phoenixConfiguration = new PhoenixPigConfiguration(configuration);
- }
-
- public PhoenixPigConfiguration getConf() {
- return this.phoenixConfiguration;
- }
-
- private Connection getConnection() {
- try {
- if (this.connection == null) {
- this.connection = phoenixConfiguration.getConnection();
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return connection;
- }
-
- /**
- * Returns the query plan associated with the select query.
- * @param context
- * @return
- * @throws IOException
- * @throws SQLException
- */
- private QueryPlan getQueryPlan(final JobContext context) throws IOException {
- Preconditions.checkNotNull(context);
- if(queryPlan == null) {
- try{
- final Connection connection = getConnection();
- final String selectStatement = getConf().getSelectStatement();
- Preconditions.checkNotNull(selectStatement);
- final Statement statement = connection.createStatement();
- final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
- // Optimize the query plan so that we potentially use secondary indexes
- this.queryPlan = pstmt.optimizeQuery(selectStatement);
- // Initialize the query plan so it sets up the parallel scans
- queryPlan.iterator();
- } catch(Exception exception) {
- LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
- throw new RuntimeException(exception);
- }
- }
- return queryPlan;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
deleted file mode 100644
index b1d015a..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.phoenix.query.KeyRange;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- *
- * Input split class to hold the lower and upper bound range. {@link KeyRange}
- *
- */
-public class PhoenixInputSplit extends InputSplit implements Writable {
-
- private List<Scan> scans;
- private KeyRange keyRange;
-
- /**
- * No Arg constructor
- */
- public PhoenixInputSplit() {
- }
-
- /**
- *
- * @param keyRange
- */
- public PhoenixInputSplit(final List<Scan> scans) {
- Preconditions.checkNotNull(scans);
- Preconditions.checkState(!scans.isEmpty());
- this.scans = scans;
- init();
- }
-
- public List<Scan> getScans() {
- return scans;
- }
-
- public KeyRange getKeyRange() {
- return keyRange;
- }
-
- private void init() {
- this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow());
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- int count = WritableUtils.readVInt(input);
- scans = Lists.newArrayListWithExpectedSize(count);
- for (int i = 0; i < count; i++) {
- byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)];
- input.readFully(protoScanBytes);
- ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes);
- Scan scan = ProtobufUtil.toScan(protoScan);
- scans.add(scan);
- }
- init();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- Preconditions.checkNotNull(scans);
- WritableUtils.writeVInt(output, scans.size());
- for (Scan scan : scans) {
- ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan);
- byte[] protoScanBytes = protoScan.toByteArray();
- WritableUtils.writeVInt(output, protoScanBytes.length);
- output.write(protoScanBytes);
- }
- }
-
- @Override
- public long getLength() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public String[] getLocations() throws IOException, InterruptedException {
- return new String[]{};
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + keyRange.hashCode();
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- // TODO: review: it's a reasonable check to use the keyRange,
- // but it's not perfect. Do we need an equals impl?
- if (this == obj) { return true; }
- if (obj == null) { return false; }
- if (!(obj instanceof PhoenixInputSplit)) { return false; }
- PhoenixInputSplit other = (PhoenixInputSplit)obj;
- if (keyRange == null) {
- if (other.keyRange != null) { return false; }
- } else if (!keyRange.equals(other.keyRange)) { return false; }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
deleted file mode 100644
index a8d9d8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.jdbc.PhoenixStatement;
-
-/**
- *
- * {@link OutputCommitter} implementation for Pig tasks using Phoenix
- * connections to upsert to HBase
- *
- *
- *
- */
-public class PhoenixOutputCommitter extends OutputCommitter {
- private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class);
-
- private final PhoenixOutputFormat outputFormat;
-
- public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) {
- if(outputFormat == null) {
- throw new IllegalArgumentException("PhoenixOutputFormat must not be null.");
- }
- this.outputFormat = outputFormat;
- }
-
- /**
- * TODO implement rollback functionality.
- *
- * {@link PhoenixStatement#execute(String)} is buffered on the client, this makes
- * it difficult to implement rollback as once a commit is issued it's hard to go
- * back all the way to undo.
- */
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- }
-
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- commit(outputFormat.getConnection(context.getConfiguration()));
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
- return true;
- }
-
- @Override
- public void setupJob(JobContext jobContext) throws IOException {
- }
-
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- }
-
- /**
- * Commit a transaction on task completion
- *
- * @param connection
- * @throws IOException
- */
- private void commit(Connection connection) throws IOException {
- try {
- if (connection == null || connection.isClosed()) {
- throw new IOException("Trying to commit a connection that is null or closed: "+ connection);
- }
- } catch (SQLException e) {
- throw new IOException("Exception calling isClosed on connection", e);
- }
-
- try {
- LOG.debug("Commit called on task completion");
- connection.commit();
- } catch (SQLException e) {
- throw new IOException("Exception while trying to commit a connection. ", e);
- } finally {
- try {
- LOG.debug("Closing connection to database on task completion");
- connection.close();
- } catch (SQLException e) {
- LOG.warn("Exception while trying to close database connection", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
deleted file mode 100644
index 9c29f8f..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- * {@link OutputFormat} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> {
- private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class);
-
- private Connection connection;
- private PhoenixPigConfiguration config;
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
- }
-
- /**
- * TODO Implement {@link OutputCommitter} to rollback in case of task failure
- */
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
- return new PhoenixOutputCommitter(this);
- }
-
- @Override
- public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
- try {
- return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config);
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * This method creates a database connection. A single instance is created
- * and passed around for re-use.
- *
- * @param configuration
- * @return
- * @throws IOException
- */
- synchronized Connection getConnection(Configuration configuration) throws IOException {
- if (connection != null) {
- return connection;
- }
-
- config = new PhoenixPigConfiguration(configuration);
- try {
- LOG.info("Initializing new Phoenix connection...");
- connection = config.getConnection();
- LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit());
- return connection;
- } catch (SQLException e) {
- throw new IOException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
deleted file mode 100644
index 5063ed0..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.phoenix.pig.util.TypeUtil;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataType;
-
-import com.google.common.base.Preconditions;
-
-/**
- * A {@link Writable} representing a Phoenix record. This class
- * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
- * b) reads the column values from the {@link ResultSet}
- *
- */
-public class PhoenixRecord implements Writable {
-
- private final List<Object> values;
- private final ResourceFieldSchema[] fieldSchemas;
-
- public PhoenixRecord() {
- this(null);
- }
-
- public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) {
- this.values = new ArrayList<Object>();
- this.fieldSchemas = fieldSchemas;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- }
-
- public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException {
- for (int i = 0; i < columnMetadataList.size(); i++) {
- Object o = values.get(i);
- ColumnInfo columnInfo = columnMetadataList.get(i);
- byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
- try {
- Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
- if (upsertValue != null) {
- statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
- } else {
- statement.setNull(i + 1, columnInfo.getSqlType());
- }
- } catch (RuntimeException re) {
- throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
- ,columnInfo.toString(),re.getMessage()),re);
-
- }
- }
-
- statement.execute();
- }
-
- public void read(final ResultSet rs, final int noOfColumns) throws SQLException {
- Preconditions.checkNotNull(rs);
- Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0");
- values.clear();
- for(int i = 1 ; i <= noOfColumns ; i++) {
- Object obj = rs.getObject(i);
- values.add(obj);
- }
- }
-
- public void add(Object value) {
- values.add(value);
- }
-
- private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
- PDataType pDataType = PDataType.fromTypeId(sqlType);
-
- return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
- }
-
- public List<Object> getValues() {
- return values;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
deleted file mode 100644
index f6808a8..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
-import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-
-/**
- * RecordReader that process the scan and returns PhoenixRecord
- *
- */
-public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
- private final QueryPlan queryPlan;
- private final int columnsCount;
- private NullWritable key = NullWritable.get();
- private PhoenixRecord value = null;
- private ResultIterator resultIterator = null;
- private PhoenixResultSet resultSet;
-
- public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException {
-
- Preconditions.checkNotNull(pConfiguration);
- Preconditions.checkNotNull(qPlan);
- this.phoenixConfiguration = pConfiguration;
- this.queryPlan = qPlan;
- this.columnsCount = phoenixConfiguration.getSelectColumnsCount();
- }
-
- @Override
- public void close() throws IOException {
- if(resultIterator != null) {
- try {
- resultIterator.close();
- } catch (SQLException e) {
- LOG.error(" Error closing resultset.");
- }
- }
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
-
- @Override
- public PhoenixRecord getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- final PhoenixInputSplit pSplit = (PhoenixInputSplit)split;
- final List<Scan> scans = pSplit.getScans();
- try {
- List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size());
- for (Scan scan : scans) {
- final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan);
- PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
- iterators.add(peekingResultIterator);
- }
- ResultIterator iterator = ConcatResultIterator.newIterator(iterators);
- if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) {
- iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager());
- }
- this.resultIterator = iterator;
- this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement());
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
- Throwables.propagate(e);
- }
-
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (key == null) {
- key = NullWritable.get();
- }
- if (value == null) {
- value = new PhoenixRecord();
- }
- Preconditions.checkNotNull(this.resultSet);
- try {
- if(!resultSet.next()) {
- return false;
- }
- value.read(resultSet,columnsCount);
- return true;
- } catch (SQLException e) {
- LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage()));
- Throwables.propagate(e);
- }
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
deleted file mode 100644
index c980a38..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.phoenix.pig.hadoop;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-
-/**
- *
- * {@link RecordWriter} implementation for Phoenix
- *
- *
- *
- */
-public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> {
-
- private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class);
-
- private long numRecords = 0;
-
- private final Connection conn;
- private final PreparedStatement statement;
- private final PhoenixPigConfiguration config;
- private final long batchSize;
-
- public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException {
- this.conn = conn;
- this.config = config;
- this.batchSize = config.getBatchSize();
- this.statement = this.conn.prepareStatement(config.getUpsertStatement());
- }
-
-
- /**
- * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}.
- *
- */
- @Override
- public void close(TaskAttemptContext context) throws IOException, InterruptedException {
- }
-
- @Override
- public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException {
- try {
- record.write(statement, config.getColumnMetadataList());
- numRecords++;
-
- if (numRecords % batchSize == 0) {
- LOG.debug("commit called on a batch of size : " + batchSize);
- conn.commit();
- }
- } catch (SQLException e) {
- throw new IOException("Exception while committing to database.", e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
deleted file mode 100644
index 3ea9b5b..0000000
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import java.util.List;
-
-import org.apache.phoenix.util.ColumnInfo;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- *
- * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back.
- *
- */
-public final class ColumnInfoToStringEncoderDecoder {
-
- private static final String COLUMN_INFO_DELIMITER = "|";
-
- private ColumnInfoToStringEncoderDecoder() {
-
- }
-
- public static String encode(List<ColumnInfo> columnInfos) {
- Preconditions.checkNotNull(columnInfos);
- return Joiner.on(COLUMN_INFO_DELIMITER).
- skipNulls().join(columnInfos);
- }
-
- public static List<ColumnInfo> decode(final String columnInfoStr) {
- Preconditions.checkNotNull(columnInfoStr);
- List<ColumnInfo> columnInfos = Lists.newArrayList(
- Iterables.transform(
- Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr),
- new Function<String, ColumnInfo>() {
- @Override
- public ColumnInfo apply(String colInfo) {
- if (colInfo.isEmpty()) {
- return null;
- }
- return ColumnInfo.fromString(colInfo);
- }
- }));
- return columnInfos;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
index 9f8a5e4..4f7d776 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java
@@ -25,8 +25,9 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
@@ -46,19 +47,20 @@ public final class PhoenixPigSchemaUtil {
private PhoenixPigSchemaUtil() {
}
- public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException {
+public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException {
final ResourceSchema schema = new ResourceSchema();
try {
- List<ColumnInfo> columns = null;
- if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) {
- final String sqlQuery = phoenixConfiguration.getSelectStatement();
- Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
- final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration);
- columns = function.apply(sqlQuery);
- } else {
- columns = phoenixConfiguration.getSelectColumnMetadataList();
- }
+ List<ColumnInfo> columns = null;
+ final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration);
+ if(SchemaType.QUERY.equals(schemaType)) {
+ final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration);
+ Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration");
+ final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration);
+ columns = function.apply(sqlQuery);
+ } else {
+ columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration);
+ }
ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()];
int i = 0;
for(ColumnInfo cinfo : columns) {
@@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil {
}
return schema;
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
index 1b3a90a..f0148a6 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java
@@ -26,16 +26,16 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
/**
@@ -46,21 +46,20 @@ import com.google.common.collect.Lists;
public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> {
private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class);
- private PhoenixPigConfiguration phoenixConfiguration;
+ private final Configuration configuration;
- public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) {
- Preconditions.checkNotNull(phoenixConfiguration);
- this.phoenixConfiguration = phoenixConfiguration;
+ public QuerySchemaParserFunction(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ this.configuration = configuration;
}
@Override
public Pair<String, String> apply(final String selectStatement) {
Preconditions.checkNotNull(selectStatement);
Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!");
- Preconditions.checkNotNull(this.phoenixConfiguration);
Connection connection = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(selectStatement);
@@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St
return new Pair<String, String>(tableName, columnsAsStr);
} catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error(" Error closing connection ");
+ throw new RuntimeException(sqle);
}
}
}
- return null;
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
index 52f646c..3ed35bb 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java
@@ -26,60 +26,59 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.util.ColumnInfo;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> {
-
- private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
- private final PhoenixPigConfiguration phoenixConfiguration;
+
+ private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class);
+ private final Configuration configuration;
- public SqlQueryToColumnInfoFunction(
- final PhoenixPigConfiguration phoenixPigConfiguration) {
- super();
- this.phoenixConfiguration = phoenixPigConfiguration;
- }
+ public SqlQueryToColumnInfoFunction(final Configuration configuration) {
+ this.configuration = configuration;
+ }
- @Override
- public List<ColumnInfo> apply(String sqlQuery) {
- Preconditions.checkNotNull(sqlQuery);
- Connection connection = null;
- List<ColumnInfo> columnInfos = null;
+ @Override
+ public List<ColumnInfo> apply(String sqlQuery) {
+ Preconditions.checkNotNull(sqlQuery);
+ Connection connection = null;
+ List<ColumnInfo> columnInfos = null;
try {
- connection = this.phoenixConfiguration.getConnection();
+ connection = ConnectionUtil.getConnection(this.configuration);
final Statement statement = connection.createStatement();
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery);
final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors();
columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size());
columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() {
- @Override
- public ColumnInfo apply(final ColumnProjector columnProjector) {
- return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
- }
-
+ @Override
+ public ColumnInfo apply(final ColumnProjector columnProjector) {
+ return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType());
+ }
+
});
- } catch (SQLException e) {
+ } catch (SQLException e) {
LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery));
- Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
if(connection != null) {
try {
connection.close();
} catch(SQLException sqle) {
- Throwables.propagate(sqle);
+ LOG.error("Error closing connection!!");
+ throw new RuntimeException(sqle);
}
}
}
- return columnInfos;
- }
+ return columnInfos;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index 1cdd66d..1da2d01 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.phoenix.pig.hadoop.PhoenixRecord;
+import org.apache.phoenix.pig.writable.PhoenixPigDBWritable;
import org.apache.phoenix.schema.PDataType;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -240,7 +240,7 @@ public final class TypeUtil {
* @return
* @throws IOException
*/
- public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException {
+ public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException {
List<Object> columnValues = record.getValues();
if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
new file mode 100644
index 0000000..a7399c9
--- /dev/null
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.pig.writable;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.pig.util.TypeUtil;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataType;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A {@link Writable} representing a Phoenix record. This class
+ * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement}
+ * b) reads the column values from the {@link ResultSet}
+ *
+ */
+public class PhoenixPigDBWritable implements DBWritable {
+
+ private final List<Object> values;
+ private ResourceFieldSchema[] fieldSchemas;
+ private List<ColumnInfo> columnMetadataList;
+
+ public PhoenixPigDBWritable() {
+ this.values = new ArrayList<Object>();
+ }
+
+ @Override
+ public void write(PreparedStatement statement) throws SQLException {
+ for (int i = 0; i < columnMetadataList.size(); i++) {
+ Object o = values.get(i);
+ ColumnInfo columnInfo = columnMetadataList.get(i);
+ byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType();
+ try {
+ Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType());
+ if (upsertValue != null) {
+ statement.setObject(i + 1, upsertValue, columnInfo.getSqlType());
+ } else {
+ statement.setNull(i + 1, columnInfo.getSqlType());
+ }
+ } catch (RuntimeException re) {
+ throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s"
+ ,columnInfo.toString(),re.getMessage()),re);
+
+ }
+ }
+ }
+
+ public void add(Object value) {
+ values.add(value);
+ }
+
+ private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
+ PDataType pDataType = PDataType.fromTypeId(sqlType);
+ return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
+ }
+
+ public List<Object> getValues() {
+ return values;
+ }
+
+ @Override
+ public void readFields(final ResultSet rs) throws SQLException {
+ Preconditions.checkNotNull(rs);
+ final int noOfColumns = rs.getMetaData().getColumnCount();
+ values.clear();
+ for(int i = 1 ; i <= noOfColumns ; i++) {
+ Object obj = rs.getObject(i);
+ values.add(obj);
+ }
+ }
+
+ public ResourceFieldSchema[] getFieldSchemas() {
+ return fieldSchemas;
+ }
+
+ public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) {
+ this.fieldSchemas = fieldSchemas;
+ }
+
+ public List<ColumnInfo> getColumnMetadataList() {
+ return columnMetadataList;
+ }
+
+ public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) {
+ this.columnMetadataList = columnMetadataList;
+ }
+
+ public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas,
+ final List<ColumnInfo> columnMetadataList) {
+ final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable ();
+ dbWritable.setFieldSchemas(fieldSchemas);
+ dbWritable.setColumnMetadataList(columnMetadataList);
+ return dbWritable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
deleted file mode 100644
index 0337563..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig;
-
-import static org.junit.Assert.assertEquals;
-
-import java.sql.SQLException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-
-/**
- * Tests for PhoenixPigConfiguration.
- *
- */
-public class PhoenixPigConfigurationTest {
-
-
- @Test
- public void testBasicConfiguration() throws SQLException {
- Configuration conf = new Configuration();
- final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf);
- final String zkQuorum = "localhost";
- final String tableName = "TABLE";
- final long batchSize = 100;
- phoenixConfiguration.configure(zkQuorum, tableName, batchSize);
- assertEquals(zkQuorum,phoenixConfiguration.getServer());
- assertEquals(tableName,phoenixConfiguration.getTableName());
- assertEquals(batchSize,phoenixConfiguration.getBatchSize());
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
deleted file mode 100644
index 9777bb5..0000000
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.pig.util;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder;
-import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-/**
- * Tests methods on {@link ColumnInfoToStringEncoderDecoder}
- */
-public class ColumnInfoToStringEncoderDecoderTest {
-
- @Test
- public void testEncode() {
- final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
- assertEquals(columnInfo.toString(),encodedColumnInfo);
- }
-
- @Test
- public void testDecode() {
- final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo));
- assertEquals(columnInfo.toString(),encodedColumnInfo);
- }
-
- @Test
- public void testEncodeDecodeWithNulls() {
- final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType());
- final ColumnInfo columnInfo2 = null;
- final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2));
- final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr);
- assertEquals(1,decodedColumnInfo.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 310128c..7a861b9 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -29,8 +29,10 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
-import org.apache.phoenix.pig.PhoenixPigConfiguration;
-import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.pig.ResourceSchema;
@@ -54,9 +56,11 @@ public class PhoenixPigSchemaUtilTest {
@Test
public void testSchema() throws SQLException, IOException {
- final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final Configuration configuration = mock(Configuration.class);
final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN);
- when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SCHEMA_TYPE)).thenReturn(SchemaType.TABLE.name());
final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration);
// expected schema.
@@ -75,9 +79,10 @@ public class PhoenixPigSchemaUtilTest {
@Test(expected=IllegalDataException.class)
public void testUnSupportedTypes() throws SQLException, IOException {
- final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class);
+ final Configuration configuration = mock(Configuration.class);
final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN);
- when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos);
+ final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos);
+ when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos);
PhoenixPigSchemaUtil.getResourceSchema(configuration);
fail("We currently don't support Array type yet. WIP!!");
}