You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/08/24 18:29:25 UTC

phoenix git commit: PHOENIX-953 Support UNNEST for ARRAY

Repository: phoenix
Updated Branches:
  refs/heads/master aa86c899c -> db137dcae


PHOENIX-953 Support UNNEST for ARRAY


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/db137dca
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/db137dca
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/db137dca

Branch: refs/heads/master
Commit: db137dcaef2d5c7e7bfd83b3ee96f62c4c6c5700
Parents: aa86c89
Author: maryannxue <we...@intel.com>
Authored: Mon Aug 24 12:29:14 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Mon Aug 24 12:29:14 2015 -0400

----------------------------------------------------------------------
 .../apache/phoenix/end2end/UnnestArrayIT.java   | 356 +++++++++++++++++++
 .../apache/phoenix/execute/TupleProjector.java  |  10 +
 .../apache/phoenix/execute/UnnestArrayPlan.java | 183 ++++++++++
 .../phoenix/execute/UnnestArrayPlanTest.java    | 162 +++++++++
 4 files changed, 711 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
new file mode 100644
index 0000000..4ca73e6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UnnestArrayIT.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Ignore
+public class UnnestArrayIT extends BaseClientManagedTimeIT {
+
+    private static long timestamp;
+
+    public static long nextTimestamp() {
+        timestamp += 100;
+        return timestamp;
+    }
+
+    @Test
+    public void testUnnestWithIntArray1() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k INTEGER[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2, 3])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithIntArray2() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k INTEGER[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2, 3])");
+        stmt.execute();
+        stmt = conn.prepareStatement("UPSERT INTO a VALUES (2, ARRAY[4, 5])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 5);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithVarcharArray1() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k VARCHAR[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY['a', 'b', 'c'])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "a");
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "b");
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "c");
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithDoubleArray1() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k DOUBLE[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[2.3, 3.4, 4.5])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getDouble(1), 2.3);
+        assertTrue(rs.next());
+        assertEquals(rs.getDouble(1), 3.4);
+        assertTrue(rs.next());
+        assertEquals(rs.getDouble(1), 4.5);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithBooleanArray1() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k BOOLEAN[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY[true, true, false])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT t.k FROM UNNEST((SELECT k FROM a)) AS t(k)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getBoolean(1), true);
+        assertTrue(rs.next());
+        assertEquals(rs.getBoolean(1), true);
+        assertTrue(rs.next());
+        assertEquals(rs.getBoolean(1), false);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithOrdinality() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ordinality FROM UNNEST(ARRAY['a','b','c']) WITH ORDINALITY AS t1(ar1, ordinality)");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "a");
+        assertEquals(rs.getInt(2), 1);
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "b");
+        assertEquals(rs.getInt(2), 2);
+        assertTrue(rs.next());
+        assertEquals(rs.getString(1), "c");
+        assertEquals(rs.getInt(2), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithJoins1() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST(ARRAY[5,6]) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getInt(2), 5);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertEquals(rs.getInt(2), 6);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertEquals(rs.getInt(2), 0);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithJoins2() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) INNER JOIN UNNEST(ARRAY[5,6]) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getInt(2), 5);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertEquals(rs.getInt(2), 6);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithJoins3() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY, k VARCHAR[])";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (1, ARRAY['a', 'b', 'c'])");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST((SELECT k FROM a)) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getString(2), "a");
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertEquals(rs.getString(2), "b");
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertEquals(rs.getString(2), "c");
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithJoins4() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[2,3,4]) WITH ORDINALITY AS t1(ar1, index) FULL OUTER JOIN UNNEST(ARRAY['a','b']) with ORDINALITY AS t2(ar2, index) ON t1.index=t2.index");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getString(2), "a");
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 3);
+        assertEquals(rs.getString(2), "b");
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 4);
+        assertEquals(rs.getString(2), null);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestWithJoins5() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 40));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("SELECT ar1, ar2 FROM UNNEST(ARRAY[1,2]) AS t1(ar1), UNNEST(ARRAY[2,3]) AS t2(ar2);");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 1);
+        assertEquals(rs.getInt(2), 2);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 1);
+        assertEquals(rs.getInt(2), 3);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getInt(2), 2);
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertEquals(rs.getInt(2), 3);
+        assertFalse(rs.next());
+    }
+
+    @Test
+    public void testUnnestInWhere() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 10));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE a (p INTEGER PRIMARY KEY)";
+        conn.createStatement().execute(ddl);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 20));
+        conn = DriverManager.getConnection(getUrl(), props);
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO a VALUES (2)");
+        stmt.execute();
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts + 30));
+        conn = DriverManager.getConnection(getUrl(), props);
+        stmt = conn.prepareStatement("SELECT * FROM a WHERE p IN(SELECT t.a FROM UNNEST(ARRAY[2,3]) AS t(a))");
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(rs.getInt(1), 2);
+        assertFalse(rs.next());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index a4728e9..a884949 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -73,6 +73,16 @@ public class TupleProjector {
         schema = builder.build();
         valueSet = ValueBitSet.newInstance(schema);
     }
+
+    public TupleProjector(Expression[] expressions) {
+        this.expressions = expressions;
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        for (int i = 0; i < expressions.length; i++) {
+            builder.addField(expressions[i]);
+        }
+        schema = builder.build();
+        valueSet = ValueBitSet.newInstance(schema);
+    }
     
     public TupleProjector(PTable projectedTable) {
         Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
new file mode 100644
index 0000000..c4a6b20
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.BaseSingleExpression;
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+
+public class UnnestArrayPlan extends DelegateQueryPlan {
+    private final Expression arrayExpression;
+    private final boolean withOrdinality;
+
+    public UnnestArrayPlan(QueryPlan delegate, Expression arrayExpression, boolean withOrdinality) {
+        super(delegate);
+        this.arrayExpression = arrayExpression;
+        this.withOrdinality = withOrdinality;
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {
+        return new UnnestArrayResultIterator(delegate.iterator());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws SQLException {
+        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper));
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> planSteps = delegate.getExplainPlan().getPlanSteps();
+        planSteps.add("UNNEST");
+        return new ExplainPlan(planSteps);
+    }
+
+    public class UnnestArrayResultIterator extends DelegateResultIterator {
+        private final UnnestArrayElemRefExpression elemRefExpression;
+        private final UnnestArrayElemIndexExpression elemIndexExpression;
+        private final TupleProjector projector;
+        private Tuple current;
+        private ImmutableBytesWritable arrayPtr;
+        private int length;
+        private int index;
+        private boolean closed;
+
+        public UnnestArrayResultIterator(ResultIterator iterator) {
+            super(iterator);
+            this.elemRefExpression = new UnnestArrayElemRefExpression(arrayExpression);
+            this.elemIndexExpression = withOrdinality ? new UnnestArrayElemIndexExpression() : null;
+            this.projector = new TupleProjector(withOrdinality ? new Expression[] {elemRefExpression, elemIndexExpression} : new Expression[] {elemRefExpression});
+            this.arrayPtr = new ImmutableBytesWritable();
+            this.length = 0;
+            this.index = 0;
+            this.closed = false;
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            if (closed)
+                return null;
+            
+            while (index >= length) {
+                this.current = super.next();
+                if (current == null) {
+                    this.closed = true;
+                    return null;
+                }
+                if (arrayExpression.evaluate(current, arrayPtr)) {
+                    this.length = PArrayDataType.getArrayLength(arrayPtr, elemRefExpression.getDataType(), arrayExpression.getMaxLength());
+                    this.index = 0;
+                    this.elemRefExpression.setArrayPtr(arrayPtr);
+                }
+            }
+            elemRefExpression.setIndex(index);
+            if (elemIndexExpression != null) {
+                elemIndexExpression.setIndex(index);
+            }
+            index++;
+            return projector.projectResults(current);
+        }
+
+        @Override
+        public void close() throws SQLException {
+            super.close();
+            closed = true;
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private static class UnnestArrayElemRefExpression extends BaseSingleExpression {
+        private final PDataType type;
+        private int index = 0;
+        private ImmutableBytesWritable arrayPtr = new ImmutableBytesWritable();
+        
+        public UnnestArrayElemRefExpression(Expression arrayExpression) {
+            super(arrayExpression);
+            this.type = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        }
+        
+        public void setIndex(int index) {
+            this.index = index;
+        }
+        
+        public void setArrayPtr(ImmutableBytesWritable arrayPtr) {
+            this.arrayPtr.set(arrayPtr.get(), arrayPtr.getOffset(), arrayPtr.getLength());
+        }
+        
+        @Override
+        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+            ptr.set(arrayPtr.get(), arrayPtr.getOffset(), arrayPtr.getLength());
+            PArrayDataType.positionAtArrayElement(ptr, index++, getDataType(), getMaxLength());
+            return true;
+        }
+
+        @Override
+        public <T> T accept(ExpressionVisitor<T> visitor) {
+            // This Expression class is only used at runtime.
+            return null;
+        }
+
+        @Override
+        public PDataType getDataType() {
+            return type;
+        }
+    }
+    
+    @SuppressWarnings("rawtypes")
+    private static class UnnestArrayElemIndexExpression extends BaseTerminalExpression {
+        private int index = 0;
+        
+        public void setIndex(int index) {
+            this.index = index;
+        }
+
+        @Override
+        public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+            byte[] lengthBuf = new byte[PInteger.INSTANCE.getByteSize()];
+            PInteger.INSTANCE.getCodec().encodeInt(index, lengthBuf, 0);
+            ptr.set(lengthBuf);
+            return true;
+        }
+
+        @Override
+        public <T> T accept(ExpressionVisitor<T> visitor) {
+            // This Expression class is only used at runtime.
+            return null;
+        }
+
+        @Override
+        public PDataType getDataType() {
+            return PInteger.INSTANCE;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/db137dca/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
new file mode 100644
index 0000000..0def172
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/UnnestArrayPlanTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnImpl;
+import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.apache.phoenix.schema.types.PVarcharArray;
+import org.apache.phoenix.schema.types.PhoenixArray;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+@SuppressWarnings("rawtypes")
+public class UnnestArrayPlanTest {
+    
+    private static final StatementContext context;
+    static {
+        try {
+            PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
+            PhoenixStatement stmt = new PhoenixStatement(connection);
+            ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
+            context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+    
+    @Test 
+    public void testUnnestIntegerArrays() throws Exception {
+        testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[] {2, 20}), false);
+    }
+    
+    @Test 
+    public void testUnnestIntegerArraysWithOrdinality() throws Exception {
+        testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[] {2, 20}), true);
+    }
+    
+    @Test 
+    public void testUnnestVarcharArrays() throws Exception {
+        testUnnestArrays(PVarcharArray.INSTANCE, Arrays.asList(new Object[] {"1", "10"}, new Object[] {"2", "20"}), false);
+    }
+    
+    @Test 
+    public void testUnnestVarcharArraysWithOrdinality() throws Exception {
+        testUnnestArrays(PVarcharArray.INSTANCE, Arrays.asList(new Object[] {"1", "10"}, new Object[] {"2", "20"}), true);
+    }
+    
+    @Test 
+    public void testUnnestEmptyArrays() throws Exception {
+        testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[]{}, new Object[] {2, 20}), false);
+    }
+    
+    @Test 
+    public void testUnnestEmptyArraysWithOrdinality() throws Exception {
+        testUnnestArrays(PIntegerArray.INSTANCE, Arrays.asList(new Object[] {1, 10}, new Object[]{}, new Object[] {2, 20}), true);
+    }
+    
+    private void testUnnestArrays(PArrayDataType arrayType, List<Object[]> arrays, boolean withOrdinality) throws Exception {
+        PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        List<Tuple> tuples = toTuples(arrayType, arrays);
+        LiteralResultIterationPlan subPlan = new LiteralResultIterationPlan(tuples.iterator(), context, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF, RowProjector.EMPTY_PROJECTOR, null, OrderBy.EMPTY_ORDER_BY, null);
+        LiteralExpression dummy = LiteralExpression.newConstant(null, arrayType);
+        RowKeyValueAccessor accessor = new RowKeyValueAccessor(Arrays.asList(dummy), 0);
+        UnnestArrayPlan plan = new UnnestArrayPlan(subPlan, new RowKeyColumnExpression(dummy, accessor), withOrdinality);
+        PColumn elemColumn = new PColumnImpl(PNameFactory.newName("ELEM"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), baseType, null, null, true, 0, SortOrder.getDefault(), null, null, false, "");
+        PColumn indexColumn = withOrdinality ? new PColumnImpl(PNameFactory.newName("IDX"), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), PInteger.INSTANCE, null, null, true, 0, SortOrder.getDefault(), null, null, false, "") : null;
+        List<PColumn> columns = withOrdinality ? Arrays.asList(elemColumn, indexColumn) : Arrays.asList(elemColumn);
+        ProjectedColumnExpression elemExpr = new ProjectedColumnExpression(elemColumn, columns, 0, elemColumn.getName().getString());
+        ProjectedColumnExpression indexExpr = withOrdinality ? new ProjectedColumnExpression(indexColumn, columns, 1, indexColumn.getName().getString()) : null;
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        ResultIterator iterator = plan.iterator();
+        for (Object[] o : flatten(arrays)) {
+            Tuple tuple = iterator.next();
+            assertNotNull(tuple);
+            assertTrue(elemExpr.evaluate(tuple, ptr));
+            Object elem = baseType.toObject(ptr);
+            assertEquals(o[0], elem);
+            if (withOrdinality) {
+                assertTrue(indexExpr.evaluate(tuple, ptr));
+                Object index = PInteger.INSTANCE.toObject(ptr);
+                assertEquals(o[1], index);                
+            }
+        }
+        assertNull(iterator.next());
+    }
+    
+    private List<Object[]> flatten(List<Object[]> arrays) {
+        List<Object[]> ret = Lists.newArrayList();
+        for (Object[] array : arrays) {
+            for (int i = 0; i < array.length; i++) {
+                ret.add(new Object[] {array[i], i});
+            }
+        }
+        return ret;
+    }
+    
+    private List<Tuple> toTuples(PArrayDataType arrayType, List<Object[]> arrays) {
+        List<Tuple> tuples = Lists.newArrayListWithExpectedSize(arrays.size());
+        PDataType baseType = PDataType.fromTypeId(arrayType.getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        for (Object[] array : arrays) {
+            PhoenixArray pArray = new PhoenixArray(baseType, array);
+            byte[] bytes = arrayType.toBytes(pArray);            
+            tuples.add(new SingleKeyValueTuple(KeyValueUtil.newKeyValue(bytes, 0, bytes.length, bytes, 0, 0, bytes, 0, 0, 0, bytes, 0, 0)));
+        }
+        
+        return tuples;
+    }
+}