You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/08/24 18:29:25 UTC
phoenix git commit: PHOENIX-953 Support UNNEST for ARRAY
Repository: phoenix
Updated Branches:
refs/heads/master aa86c899c -> db137dcae
PHOENIX-953 Support UNNEST for ARRAY
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/db137dca
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/db137dca
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/db137dca
Branch: refs/heads/master
Commit: db137dcaef2d5c7e7bfd83b3ee96f62c4c6c5700
Parents: aa86c89
Author: maryannxue <we...@intel.com>
Authored: Mon Aug 24 12:29:14 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Mon Aug 24 12:29:14 2015 -0400
----------------------------------------------------------------------
.../apache/phoenix/end2end/UnnestArrayIT.java | 356 +++++++++++++++++++
.../apache/phoenix/execute/TupleProjector.java | 10 +
.../apache/phoenix/execute/UnnestArrayPlan.java | 183 ++++++++++
.../phoenix/execute/UnnestArrayPlanTest.java | 162 +++++++++
4 files changed, 711 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
new file mode 100644
index 0000000..4ca73e6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Ignore
+public class UnnestArrayIT extends BaseClientManagedTimeIT {
+
+ private static long timestamp;
+
+ public static long nextTimestamp() {
+ timestamp += 100;
+ return timestamp;
+ }
+
+ @Test
+ public void testUnnestWithIntArray1() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k INTEGER[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2, 3])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithIntArray2() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k INTEGER[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2, 3])");
+ stmt.execute();
+ stmt = conn.prepareStatement("UPSERT INTO a VALUES (2, ARRAY[4, 5])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 5);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithVarcharArray1() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k VARCHAR[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY['a', 'b', 'c'])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "a");
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "b");
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "c");
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithDoubleArray1() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k DOUBLE[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2.3, 3.4, 4.5])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getDouble(1), 2.3);
+ assertTrue(rs.next());
+ assertEquals(rs.getDouble(1), 3.4);
+ assertTrue(rs.next());
+ assertEquals(rs.getDouble(1), 4.5);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithBooleanArray1() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k BOOLEAN[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[true, true, false])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getBoolean(1), true);
+ assertTrue(rs.next());
+ assertEquals(rs.getBoolean(1), true);
+ assertTrue(rs.next());
+ assertEquals(rs.getBoolean(1), false);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithOrdinality() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ordinality FROM UNNEST(ARRAY['a','b','c']) WITH ORDINALITY AS t1(ar1, ordinality)");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "a");
+ assertEquals(rs.getInt(2), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "b");
+ assertEquals(rs.getInt(2), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getString(1), "c");
+ assertEquals(rs.getInt(2), 3);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithJoins1() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST(ARRAY[5,6]) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 5);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getInt(2), 6);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertEquals(rs.getInt(2), 0);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithJoins2() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) INNER JOIN UNNEST(ARRAY[5,6]) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 5);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getInt(2), 6);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithJoins3() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k VARCHAR[])";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY['a', 'b', 'c'])");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST((SELECT k FROM a)) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getString(2), "a");
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getString(2), "b");
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertEquals(rs.getString(2), "c");
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithJoins4() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST(ARRAY['a','b']) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getString(2), "a");
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getString(2), "b");
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertEquals(rs.getString(2), null);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestWithJoins5() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[1,2]) AS t1(ar1), UNNEST(ARRAY[2,3]) AS t2(ar2);");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 1);
+ assertEquals(rs.getInt(2), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 1);
+ assertEquals(rs.getInt(2), 3);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 3);
+ assertFalse(rs.next());
+ }
+
+ @Test
+ public void testUnnestInWhere() throws Exception {
+ long ts = nextTimestamp();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY)";
+ conn.createStatement().execute(ddl);
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
+ conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (2)");
+ stmt.execute();
+ conn.commit();
+ conn.close();
+
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+ conn = DriverManager.getConnection(getUrl(), props);
+ stmt = conn.prepareStatement("SELECT * FROM a WHERE p IN(SELECT t.a FROM UNNEST(ARRAY[2,3]) AS t(a))");
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertFalse(rs.next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index a4728e9..a884949 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -73,6 +73,16 @@ public class TupleProjector {
schema = builder.build();
valueSet = ValueBitSet.newInstance(schema);
}
+
+ public TupleProjector(Expression[] expressions) {
+ this.expressions = expressions;
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ for (int i = 0; i < expressions.length; i++) {
+ builder.addField(expressions[i]);
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
public TupleProjector(PTable projectedTable) {
Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
new file mode 100644
index 0000000..c4a6b20
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.BaseSingleExpression;
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+
+public class UnnestArrayPlan extends DelegateQueryPlan {
+ private final Expression arrayExpression;
+ private final boolean withOrdinality;
+
+ public UnnestArrayPlan(QueryPlan delegate, Expression arrayExpression, boolean withOrdinality) {
+ super(delegate);
+ this.arrayExpression = arrayExpression;
+ this.withOrdinality = withOrdinality;
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return new UnnestArrayResultIterator(delegate.iterator());
+ }
+
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+ return new UnnestArrayResultIterator(delegate.iterator(scanGrouper));
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> planSteps = delegate.getExplainPlan().getPlanSteps();
+ planSteps.add("UNNEST");
+ return new ExplainPlan(planSteps);
+ }
+
+ public class UnnestArrayResultIterator extends DelegateResultIterator {
+ private final UnnestArrayElemRefExpression elemRefExpression;
+ private final UnnestArrayElemIndexExpression elemIndexExpression;
+ private final TupleProjector projector;
+ private Tuple current;
+ private ImmutableBytesWritable arrayPtr;
+ private int length;
+ private int index;
+ private boolean closed;
+
+ public UnnestArrayResultIterator(ResultIterator iterator) {
+ super(iterator);
+ this.elemRefExpression = new UnnestArrayElemRefExpression(arrayExpression);
+ this.elemIndexExpression = withOrdinality ? new UnnestArrayElemIndexExpression() : null;
+ this.projector = new TupleProjector(withOrdinality ? new Expression[] {elemRefExpression, elemIndexExpression} : new Expression[] {elemRefExpression});
+ this.arrayPtr = new ImmutableBytesWritable();
+ this.length = 0;
+ this.index = 0;
+ this.closed = false;
+ }
+
+ @Override
+ public Tuple next() throws SQLException {
+ if (closed)
+ return null;
+
+ while (index >= length) {
+ this.current = super.next();
+ if (current == null) {
+ this.closed = true;
+ return null;
+ }
+ if (arrayExpression.evaluate(current, arrayPtr)) {
+ this.length = PArrayDataType.getArrayLength(arrayPtr, elemRefExpression.getDataType(), arrayExpression.getMaxLength());
+ this.index = 0;
+ this.elemRefExpression.setArrayPtr(arrayPtr);
+ }
+ }
+ elemRefExpression.setIndex(index);
+ if (elemIndexExpression != null) {
+ elemIndexExpression.setIndex(index);
+ }
+ index++;
+ return projector.projectResults(current);
+ }
+
+ @Override
+ public void close() throws SQLException {
+ super.close();
+ closed = true;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class UnnestArrayElemRefExpression extends BaseSingleExpression {
+ private final PDataType type;
+ private int index = 0;
+ private ImmutableBytesWritable arrayPtr = new ImmutableBytesWritable();
+
+ public UnnestArrayElemRefExpression(Expression arrayExpression) {
+ super(arrayExpression);
+ this.type = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE);
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ public void setArrayPtr(ImmutableBytesWritable arrayPtr) {
+ this.arrayPtr.set(arrayPtr.get(), arrayPtr.getOffset(), arrayPtr.getLength());
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ ptr.set(arrayPtr.get(), arrayPtr.getOffset(), arrayPtr.getLength());
+ PArrayDataType.positionAtArrayElement(ptr, index++, getDataType(), getMaxLength());
+ return true;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ // This Expression class is only used at runtime.
+ return null;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return type;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ private static class UnnestArrayElemIndexExpression extends BaseTerminalExpression {
+ private int index = 0;
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ byte[] lengthBuf = new byte[PInteger.INSTANCE.getByteSize()];
+ PInteger.INSTANCE.getCodec().encodeInt(index, lengthBuf, 0);
+ ptr.set(lengthBuf);
+ return true;
+ }
+
+ @Override
+ public <T> T accept(ExpressionVisitor<T> visitor) {
+ // This Expression class is only used at runtime.
+ return null;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PInteger.INSTANCE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
new file mode 100644
index 0000000..0def172
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.apache.phoenix.schema.types.PVarcharArray;
+import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("rawtypes")
+public class UnnestArrayPlanTest {
+
+ private static final StatementContext context;
+ static {
+ try {
+ PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
+ PhoenixStatement stmt = new PhoenixStatement(connection);
+ ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
+ context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testUnnestIntegerArrays() throws Exception {
+ testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[] {2, 20}), false);
+ }
+
+ @Test
+ public void testUnnestIntegerArraysWithOrdinality() throws Exception {
+ testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[] {2, 20}), true);
+ }
+
+ @Test
+ public void testUnnestVarcharArrays() throws Exception {
+ testUnnestArrays(PVarcharArray.INSTANCE, Arrays.asList(new Object[] {"1", "10"}, new Object[] {"2", "20"}), false);
+ }
+
+ @Test
+ public void testUnnestVarcharArraysWithOrdinality() throws Exception {
+ testUnnestArrays(PVarcharArray.INSTANCE, Arrays.asList(new Object[] {"1", "10"}, new Object[] {"2", "20"}), true);
+ }
+
+ @Test
+ public void testUnnestEmptyArrays() throws Exception {
+ testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[]{}, new Object[] {2, 20}), false);
+ }
+
+ @Test
+ public void testUnnestEmptyArraysWithOrdinality() throws Exception {
+ testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[]{}, new Object[] {2, 20}), true);
+ }
+
+ private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> arrays, boolean withOrdinality) throws Exception {
+ PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+ List<Tuple> tuples = toTuples(arrayType, arrays);
+ LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples.iterator(), context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+ LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
+ RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
+ UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);
+ PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "");
+ PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "") : null;
+ List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn);
+ ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString());
+ ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null;
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ ResultIterator iterator = plan.iterator();
+ for (Object[] o : flatten(arrays)) {
+ Tuple tuple = iterator.next();
+ assertNotNull(tuple);
+ assertTrue(elemExpr.evaluate(tuple, ptr));
+ Object elem = baseType.toObject(ptr);
+ assertEquals(o[0], elem);
+ if (withOrdinality) {
+ assertTrue(indexExpr.evaluate(tuple, ptr));
+ Object index = PInteger.INSTANCE.toObject(ptr);
+ assertEquals(o[1], index);
+ }
+ }
+ assertNull(iterator.next());
+ }
+
+ private List<Object[]> flatten(List<Object[]> arrays) {
+ List<Object[]> ret = Lists.newArrayList();
+ for (Object[] array : arrays) {
+ for (int i = 0; i < array.length; i++) {
+ ret.add(new Object[] {array[i], i});
+ }
+ }
+ return ret;
+ }
+
+ private List<Tuple> toTuples(PArrayDataType arrayType, List<Object[]> arrays) {
+ List<Tuple> tuples = Lists.newArrayListWithExpectedSize(arrays.size());
+ PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+ for (Object[] array : arrays) {
+ PhoenixArray pArray = new PhoenixArray(baseType, array);
+ byte[] bytes = arrayType.toBytes(pArray);
+ tuples.add(new SingleKeyValueTuple(KeyValueUtil.newKeyValue(bytes, 0, bytes.length, bytes, 0, 0, bytes, 0, 0, 0, bytes, 0, 0)));
+ }
+
+ return tuples;
+ }
+}