You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2019/03/18 02:26:53 UTC

[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5148 Improve OrderPreservingTracker to optimize OrderBy/GroupBy for ClientScanPlan and ClientAggregatePlan

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new a34d5f6  PHOENIX-5148 Improve OrderPreservingTracker to optimize OrderBy/GroupBy for ClientScanPlan and ClientAggregatePlan
a34d5f6 is described below

commit a34d5f6336fb99627a9ea46032e3befa2d104767
Author: chenglei <ch...@apache.org>
AuthorDate: Mon Mar 18 10:26:00 2019 +0800

    PHOENIX-5148 Improve OrderPreservingTracker to optimize OrderBy/GroupBy for ClientScanPlan and ClientAggregatePlan
---
 .../org/apache/phoenix/end2end/DerivedTableIT.java |   2 -
 .../java/org/apache/phoenix/end2end/OrderByIT.java | 148 ++++++
 .../phoenix/end2end/SortMergeJoinMoreIT.java       | 126 ++++-
 .../end2end/join/SortMergeJoinGlobalIndexIT.java   |   3 +-
 .../end2end/join/SortMergeJoinLocalIndexIT.java    |   3 +-
 .../end2end/join/SubqueryUsingSortMergeJoinIT.java |  26 +-
 .../apache/phoenix/compile/ExpressionCompiler.java |   6 +-
 .../apache/phoenix/compile/GroupByCompiler.java    |  35 +-
 .../apache/phoenix/compile/ListJarsQueryPlan.java  |   5 +
 .../apache/phoenix/compile/OrderByCompiler.java    | 104 ++--
 .../phoenix/compile/OrderPreservingTracker.java    | 556 +++++++++++++++------
 .../org/apache/phoenix/compile/QueryCompiler.java  |  90 +++-
 .../java/org/apache/phoenix/compile/QueryPlan.java |  20 +-
 .../compile/StatelessExpressionCompiler.java       |  57 +++
 .../org/apache/phoenix/compile/TraceQueryPlan.java |   5 +
 .../org/apache/phoenix/compile/UnionCompiler.java  |   2 +-
 .../org/apache/phoenix/execute/AggregatePlan.java  |  39 +-
 .../phoenix/execute/ClientAggregatePlan.java       |  31 +-
 .../org/apache/phoenix/execute/ClientScanPlan.java |  22 +
 .../apache/phoenix/execute/CursorFetchPlan.java    |   6 +-
 .../apache/phoenix/execute/DelegateQueryPlan.java  |   5 +
 .../execute/LiteralResultIterationPlan.java        |   5 +
 .../java/org/apache/phoenix/execute/ScanPlan.java  |  24 +
 .../apache/phoenix/execute/SortMergeJoinPlan.java  | 115 ++++-
 .../phoenix/execute/TupleProjectionPlan.java       | 112 ++++-
 .../java/org/apache/phoenix/execute/UnionPlan.java |  13 +
 .../apache/phoenix/execute/UnnestArrayPlan.java    |   7 +
 .../phoenix/expression/OrderByExpression.java      |  85 +++-
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  |   5 +
 .../org/apache/phoenix/util/ExpressionUtil.java    | 294 +++++++++++
 .../apache/phoenix/compile/QueryCompilerTest.java  | 451 +++++++++++++++++
 .../phoenix/iterate/OrderedResultIteratorTest.java |   2 +-
 .../phoenix/query/ParallelIteratorsSplitTest.java  |   6 +-
 33 files changed, 2118 insertions(+), 292 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
index a190029..76d206d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
@@ -123,7 +123,6 @@ public class DerivedTableIT extends ParallelStatsDisabledIT {
                 "CLIENT PARALLEL 1-WAY FULL SCAN OVER "+dynamicTableName+"_DERIVED_IDX\n" +
                         "    SERVER AGGREGATE INTO DISTINCT ROWS BY [\"A_STRING\", \"B_STRING\"]\n" +
                         "CLIENT MERGE SORT\n" +
-                        "CLIENT SORTED BY [A]\n" +
                         "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
                         "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]\n" +
                         "CLIENT SORTED BY [A DESC]"}});
@@ -140,7 +139,6 @@ public class DerivedTableIT extends ParallelStatsDisabledIT {
                 "CLIENT PARALLEL 4-WAY FULL SCAN OVER "+dynamicTableName+"\n" +
                         "    SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
                         "CLIENT MERGE SORT\n" +
-                        "CLIENT SORTED BY [A]\n" +
                         "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
                         "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]\n" +
                         "CLIENT SORTED BY [A DESC]"}});
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
index 172ed89..d4d5677 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertResultSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -506,4 +507,151 @@ public class OrderByIT extends BaseOrderByIT {
             }
         }
     }
+
+    @Test
+    public void testOrderPreservingOptimizeBug5148() throws Exception {
+        doTestOrderPreservingOptimizeBug5148(false,false);
+        doTestOrderPreservingOptimizeBug5148(false,true);
+        doTestOrderPreservingOptimizeBug5148(true,false);
+        doTestOrderPreservingOptimizeBug5148(true,true);
+    }
+
+    private void doTestOrderPreservingOptimizeBug5148(boolean desc ,boolean salted) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl(), props);
+            String tableName = generateUniqueName();
+            String sql = "create table " + tableName + "( "+
+                    " pk1 varchar not null , " +
+                    " pk2 varchar not null, " +
+                    " pk3 varchar not null," +
+                    " v1 varchar, " +
+                    " v2 varchar, " +
+                    " CONSTRAINT TEST_PK PRIMARY KEY ( "+
+                    "pk1 "+(desc ? "desc" : "")+", "+
+                    "pk2 "+(desc ? "desc" : "")+", "+
+                    "pk3 "+(desc ? "desc" : "")+
+                    " )) "+(salted ? "SALT_BUCKETS =4" : "split on('b')");
+            conn.createStatement().execute(sql);
+
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a11','a12','a13','a14','a15')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a21','a22','a23','a24','a25')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('a31','a32','a33','a34','a35')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b11','b12','b13','b14','b15')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b21','b22','b23','b24','b25')");
+            conn.createStatement().execute("UPSERT INTO "+tableName+" VALUES ('b31','b32','b33','b34','b35')");
+            conn.commit();
+            //TestOrderByOrderPreserving
+            sql = "select pk3 from (select v1,v2,pk3 from " + tableName + " t where pk1 > 'a10' order by t.v2,t.pk3,t.v1 limit 10) a order by v2,pk3";
+            ResultSet rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a13"},{"a23"},{"a33"},{"b13"},{"b23"},{"b33"}});
+
+            sql = "select pk3 from (select v1,v2,pk3 from "+tableName+" t where pk1 > 'a10' order by t.v2 desc,t.pk3 desc,t.v1 desc limit 10) a order by v2 desc ,pk3 desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b33"},{"b23"},{"b13"},{"a33"},{"a23"},{"a13"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3),t.pk2 limit 10) a order by cnt,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,count(pk3) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3),t.pk2 limit 10) a "+
+                  "order by cast(cnt as bigint),substr(sub,0,3)";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3) desc,t.pk2 desc limit 10) a "+
+                  "order by cnt desc,sub desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,count(pk3) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3) desc,t.pk2 desc limit 10) a "+
+                  "order by cast(cnt as bigint) desc,substr(sub,0,3) desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            sql = "select sub,pk2 from (select substr(v2,0,3) sub,pk2 from "+tableName+" t where pk1 > 'a10' group by pk2,v2 limit 10) a order by pk2,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a15","a12"},{"a25","a22"},{"a35","a32"},{"b15","b12"},{"b25","b22"},{"b35","b32"}});
+
+            sql = "select sub,pk2 from (select substr(v2,0,3) sub,pk2 from "+tableName+" t where pk1 > 'a10' group by pk2,v2 limit 10) a order by pk2 desc,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b35","b32"},{"b25","b22"},{"b15","b12"},{"a35","a32"},{"a25","a22"},{"a15","a12"}});
+
+            //test innerQueryPlan is ordered by rowKey
+            sql = "select pk3 from (select pk3,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1,t.pk2 limit 10) a where pk3 > 'a10' order by pk1,pk2,pk3";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a13"},{"a23"},{"a33"},{"b13"},{"b23"},{"b33"}});
+
+            sql = "select sub from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1,t.pk2 limit 10) a where sub > 'a10' order by pk1,pk2,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a13"},{"a23"},{"a33"},{"b13"},{"b23"},{"b33"}});
+
+            sql = "select pk3 from (select pk3,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1 desc,t.pk2 desc limit 10) a where pk3 > 'a10' order by pk1 desc ,pk2 desc ,pk3 desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b33"},{"b23"},{"b13"},{"a33"},{"a23"},{"a13"}});
+
+            sql = "select sub from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1 desc,t.pk2 desc limit 10) a where sub > 'a10' order by pk1 desc,pk2 desc,sub desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b33"},{"b23"},{"b13"},{"a33"},{"a23"},{"a13"}});
+
+            //TestGroupByOrderPreserving
+            sql = "select pk2 from (select v1,pk2,pk1 from "+tableName+" t where pk1 > 'a10' order by t.pk2,t.v1,t.pk1 limit 10) a group by pk2, v1 order by pk2,v1";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select pk2 from (select v1,pk2,pk1 from "+tableName+" t where pk1 > 'a10' order by t.pk2 desc,t.v1 desc,t.pk1 limit 10) a group by pk2, v1 order by pk2 desc,v1 desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3),t.pk2 limit 10) a "+
+                  "group by cnt,sub order by cnt,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select substr(sub,0,3) from (select substr(pk2,0,3) sub,count(pk3) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3),t.pk2 limit 10) a "+
+                  "group by cast(cnt as bigint),substr(sub,0,3) order by cast(cnt as bigint),substr(sub,0,3)";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select sub from (select substr(pk2,0,3) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3) desc,t.pk2 desc limit 10) a "+
+                  "group by cnt,sub order by cnt desc,sub desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            sql = "select substr(sub,0,3) from (select substr(pk2,0,3) sub,count(pk3) cnt from "+tableName+" t where pk1 > 'a10' group by v1 ,pk2 order by count(pk3) desc,t.pk2 desc limit 10) a "+
+                  "group by cast(cnt as bigint),substr(sub,0,3) order by cast(cnt as bigint) desc,substr(sub,0,3) desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            sql = "select pk2 from (select substr(v2,0,3) sub,pk2 from "+tableName+" t where pk1 > 'a10' group by pk2,v2 limit 10) a group by pk2,sub order by pk2,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a12"},{"a22"},{"a32"},{"b12"},{"b22"},{"b32"}});
+
+            sql = "select pk2 from (select substr(v2,0,3) sub,pk2 from "+tableName+" t where pk1 > 'a10' group by pk2,v2 limit 10) a group by pk2,sub order by pk2 desc,sub";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b32"},{"b22"},{"b12"},{"a32"},{"a22"},{"a12"}});
+
+            //test innerQueryPlan is ordered by rowKey
+            sql = "select pk3 from (select pk3,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1,t.pk2 limit 10) a where pk3 > 'a10' group by pk1,pk2,pk3 order by pk1,pk2,pk3";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a13"},{"a23"},{"a33"},{"b13"},{"b23"},{"b33"}});
+
+            sql = "select sub from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1,t.pk2 limit 10) a where sub > 'a10' group by pk1,pk2,sub order by pk1,pk2";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"a13"},{"a23"},{"a33"},{"b13"},{"b23"},{"b33"}});
+
+            sql = "select pk3 from (select pk3,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1 desc,t.pk2 desc limit 10) a where pk3 > 'a10' group by pk1, pk2, pk3 order by pk1 desc ,pk2 desc ,pk3 desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b33"},{"b23"},{"b13"},{"a33"},{"a23"},{"a13"}});
+
+            sql = "select sub from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 > 'a10' order by t.pk1 desc,t.pk2 desc limit 10) a where sub > 'a10' group by pk1,pk2,sub order by pk1 desc,pk2 desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{"b33"},{"b23"},{"b13"},{"a33"},{"a23"},{"a13"}});
+        } finally {
+            if(conn != null) {
+                conn.close();
+            }
+        }
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
index 2db8e85..bbaa68a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.end2end;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.assertResultSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -406,7 +407,7 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                 stmt.execute();
                 stmt.setLong(2, 1462993430000000000L);
                 stmt.execute();
-                
+                //add order by to make the query result stable
                 String q =
                         "SELECT C.BUCKET, C.TIMESTAMP FROM (\n" +
                         "     SELECT E.BUCKET as BUCKET, L.BUCKET as LBUCKET, E.TIMESTAMP as TIMESTAMP, L.TIMESTAMP as LTIMESTAMP FROM\n" +
@@ -423,7 +424,7 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                         "         ) L\n" +
                         "     ON L.BUCKET = E.BUCKET AND L.TIMESTAMP = E.TIMESTAMP\n" +
                         " ) C\n" +
-                        " GROUP BY C.BUCKET, C.TIMESTAMP";
+                        " GROUP BY C.BUCKET, C.TIMESTAMP ORDER BY C.BUCKET, C.TIMESTAMP";
                 
                 String p = i == 0 ?
                         "SORT-MERGE-JOIN (INNER) TABLES\n" +
@@ -440,8 +441,7 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                         "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "    CLIENT MERGE SORT\n" +
                         "    CLIENT SORTED BY [BUCKET, \"TIMESTAMP\"]\n" +
-                        "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
-                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]"
+                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]"
                         :
                         "SORT-MERGE-JOIN (INNER) TABLES\n" +
                         "    CLIENT PARALLEL 2-WAY SKIP SCAN ON 2 RANGES OVER " + eventCountTableName + " [0,'5SEC',~1462993520000000000,'Tr/Bal'] - [1,'5SEC',~1462993420000000000,'Tr/Bal']\n" +
@@ -456,8 +456,7 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                         "        SERVER DISTINCT PREFIX FILTER OVER [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "        SERVER AGGREGATE INTO ORDERED DISTINCT ROWS BY [BUCKET, \"TIMESTAMP\", SRC_LOCATION, DST_LOCATION]\n" +
                         "    CLIENT MERGE SORT\n" +
-                        "CLIENT SORTED BY [E.TIMESTAMP, E.BUCKET]\n" +
-                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.TIMESTAMP, E.BUCKET]";
+                        "CLIENT AGGREGATE INTO DISTINCT ROWS BY [E.BUCKET, E.TIMESTAMP]";
                 
                 ResultSet rs = conn.createStatement().executeQuery("explain " + q);
                 assertEquals(p, QueryUtil.getExplainPlan(rs));
@@ -465,34 +464,34 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
                 rs = conn.createStatement().executeQuery(q);
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993520000000000L, rs.getLong(2));
+                assertEquals(1462993430000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993515000000000L, rs.getLong(2));
+                assertEquals(1462993470000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993510000000000L, rs.getLong(2));
+                assertEquals(1462993475000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993505000000000L, rs.getLong(2));
+                assertEquals(1462993480000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993490000000000L, rs.getLong(2));
+                assertEquals(1462993485000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993485000000000L, rs.getLong(2));
+                assertEquals(1462993490000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993480000000000L, rs.getLong(2));
+                assertEquals(1462993505000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993475000000000L, rs.getLong(2));
+                assertEquals(1462993510000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993470000000000L, rs.getLong(2));
+                assertEquals(1462993515000000000L, rs.getLong(2));
                 assertTrue(rs.next());
                 assertEquals("5SEC", rs.getString(1));
-                assertEquals(1462993430000000000L, rs.getLong(2));
+                assertEquals(1462993520000000000L, rs.getLong(2));
                 assertFalse(rs.next());
             }
         } finally {
@@ -851,4 +850,99 @@ public class SortMergeJoinMoreIT extends ParallelStatsDisabledIT {
             }
         }
     }
+
+    @Test
+    public void testOrderPreservingForSortMergeJoinBug5148() throws Exception {
+        doTestOrderPreservingForSortMergeJoinBug5148(false, false);
+        doTestOrderPreservingForSortMergeJoinBug5148(false, true);
+        doTestOrderPreservingForSortMergeJoinBug5148(true, false);
+        doTestOrderPreservingForSortMergeJoinBug5148(true, true);
+    }
+
+    private void doTestOrderPreservingForSortMergeJoinBug5148(boolean desc, boolean salted) throws Exception {
+        Connection conn = null;
+        try {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+
+            String tableName1 = generateUniqueName();
+            String tableName2 = generateUniqueName();
+
+            String sql = "CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
+                    "AID INTEGER PRIMARY KEY "+(desc ? "desc" : "")+","+
+                    "AGE INTEGER"+
+                    ") "+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (1,11)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (2,22)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (3,33)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (4,44)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (5,55)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (6,66)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (7,77)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (8,88)");
+            conn.createStatement().execute("UPSERT INTO "+tableName1+"(AID,AGE) VALUES (9,99)");
+            conn.commit();
+
+            sql = "CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+
+                    "BID INTEGER PRIMARY KEY "+(desc ? "desc" : "")+","+
+                    "CODE INTEGER"+
+                    ")"+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (1,22)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (2,22)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (3,44)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (4,44)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (5,66)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (6,66)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (7,88)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (8,88)");
+            conn.createStatement().execute("UPSERT INTO "+tableName2+"(BID,CODE) VALUES (9,00)");
+            conn.commit();
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code order by a.aid ,a.age";
+            ResultSet rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{2,22},{4,44},{6,66},{8,88}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code order by a.aid desc,a.age desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{8,88},{6,66},{4,44},{2,22}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,a.age from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code group by a.aid,a.age order by a.aid ,a.age";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{2,22},{4,44},{6,66},{8,88}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,a.age from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code group by a.aid,a.age order by a.aid desc,a.age desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{8,88},{6,66},{4,44},{2,22}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code order by b.bid ,b.code";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{2,22},{4,44},{6,66},{8,88}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code order by b.bid desc ,b.code desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{8,88},{6,66},{4,44},{2,22}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code group by b.bid, b.code order by b.bid ,b.code";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{2,22},{4,44},{6,66},{8,88}});
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=99 order by age limit 10) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 10) b on a.aid=b.bid and a.age = b.code group by b.bid, b.code order by b.bid desc,b.code desc";
+            rs = conn.prepareStatement(sql).executeQuery();
+            assertResultSet(rs, new Object[][]{{8,88},{6,66},{4,44},{2,22}});
+        } finally {
+            if(conn!=null) {
+                conn.close();
+            }
+        }
+    }
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinGlobalIndexIT.java
index ce6f032..297f970 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinGlobalIndexIT.java
@@ -76,8 +76,7 @@ public class SortMergeJoinGlobalIndexIT extends SortMergeJoinIT {
                 "    CLIENT PARALLEL 1-WAY FULL SCAN OVER Join.idx_item\n" +
                 "        SERVER FILTER BY FIRST KEY ONLY\n" +
                 "        SERVER SORTED BY [\"I2.:item_id\"]\n" +
-                "    CLIENT MERGE SORT\n" +
-                "CLIENT SORTED BY [\"I1.:item_id\"]"
+                "    CLIENT MERGE SORT"
                 }});
         return testCases;
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinLocalIndexIT.java
index 919aa69..5e5708a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinLocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinLocalIndexIT.java
@@ -76,8 +76,7 @@ public class SortMergeJoinLocalIndexIT extends SortMergeJoinIT {
                 "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_ITEM_TABLE_FULL_NAME + " [1]\n" +
                 "        SERVER FILTER BY FIRST KEY ONLY\n" +
                 "        SERVER SORTED BY [\"I2.:item_id\"]\n" +
-                "    CLIENT MERGE SORT\n" +
-                "CLIENT SORTED BY [\"I1.:item_id\"]"
+                "    CLIENT MERGE SORT"
                 }});
         return testCases;
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SubqueryUsingSortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SubqueryUsingSortMergeJoinIT.java
index 665908f..4e9d5ca 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SubqueryUsingSortMergeJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SubqueryUsingSortMergeJoinIT.java
@@ -32,6 +32,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
+import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.junit.Test;
@@ -516,8 +522,8 @@ public class SubqueryUsingSortMergeJoinIT extends BaseJoinIT {
             assertEquals(rs.getString(2), "T6");
 
             assertFalse(rs.next());
-            
-            query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", name FROM " + tableName4 + " o JOIN " + tableName1 + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ANY(SELECT quantity FROM " + tableName4 + " q WHERE o.\"item_id\" = q.\"item_id\" GROUP BY quantity)";
+            //add order by to make the query result stable
+            query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", name FROM " + tableName4 + " o JOIN " + tableName1 + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ANY(SELECT quantity FROM " + tableName4 + " q WHERE o.\"item_id\" = q.\"item_id\" GROUP BY quantity) order by \"order_id\"";
             statement = conn.prepareStatement(query);
             rs = statement.executeQuery();
             assertTrue (rs.next());
@@ -528,6 +534,22 @@ public class SubqueryUsingSortMergeJoinIT extends BaseJoinIT {
             assertEquals(rs.getString(2), "T6");
 
             assertFalse(rs.next());
+
+            PhoenixPreparedStatement phoenixPreparedStatement = statement.unwrap(PhoenixPreparedStatement.class);
+            ClientScanPlan clientScanPlan =(ClientScanPlan)phoenixPreparedStatement.optimizeQuery(query);
+            SortMergeJoinPlan sortMergeJoin = (SortMergeJoinPlan)clientScanPlan.getDelegate();
+            ClientScanPlan lhsQueryPlan = (ClientScanPlan)sortMergeJoin.getLhsPlan();
+            /**
+             * test orderBy of lhs of final SortJoinMergePlan is avoid.
+             */
+            assertTrue(lhsQueryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            TupleProjectionPlan rhsQueryPlan = (TupleProjectionPlan)sortMergeJoin.getRhsPlan();
+            ClientAggregatePlan clientAggregatePlan = (ClientAggregatePlan)rhsQueryPlan.getDelegate();
+            /**
+             * test groupBy and orderBy of rhs of final SortJoinMergePlan is avoid.
+             */
+            assertTrue(clientAggregatePlan.getGroupBy().isOrderPreserving());
+            assertTrue(clientAggregatePlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
         } finally {
             conn.close();
         }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
index 077e1af..c0d5bcb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java
@@ -60,7 +60,6 @@ import org.apache.phoenix.expression.LongSubtractExpression;
 import org.apache.phoenix.expression.ModulusExpression;
 import org.apache.phoenix.expression.NotExpression;
 import org.apache.phoenix.expression.OrExpression;
-import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
 import org.apache.phoenix.expression.StringBasedLikeExpression;
 import org.apache.phoenix.expression.StringConcatExpression;
@@ -117,9 +116,7 @@ import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
@@ -301,8 +298,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio
             int index = groupBy.getExpressions().indexOf(expression);
             if (index >= 0) {
                 isAggregate = true;
-                RowKeyValueAccessor accessor = new RowKeyValueAccessor(groupBy.getKeyExpressions(), index);
-                expression = new RowKeyColumnExpression(expression, accessor, groupBy.getKeyExpressions().get(index).getDataType());
+                expression = ExpressionUtil.convertGroupByExpressionToRowKeyColumnExpression(groupBy, expression, index);
             }
         }
         return expression;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index 2bdea9a..c47e9a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -26,11 +26,11 @@ import java.util.List;
 import net.jcip.annotations.Immutable;
 
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.OrderPreservingTracker.Info;
 import org.apache.phoenix.compile.OrderPreservingTracker.Ordering;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.parse.AliasedNode;
@@ -63,9 +63,10 @@ public class GroupByCompiler {
         private final boolean isOrderPreserving;
         private final int orderPreservingColumnCount;
         private final boolean isUngroupedAggregate;
+        private final List<Info> orderPreservingTrackInfos;
         public static final GroupByCompiler.GroupBy EMPTY_GROUP_BY = new GroupBy(new GroupByBuilder()) {
             @Override
-            public GroupBy compile(StatementContext context, TupleProjector tupleProjector) throws SQLException {
+            public GroupBy compile(StatementContext context, QueryPlan innerQueryPlan, Expression whereExpression) throws SQLException {
                 return this;
             }
             
@@ -80,7 +81,7 @@ public class GroupByCompiler {
         };
         public static final GroupByCompiler.GroupBy UNGROUPED_GROUP_BY = new GroupBy(new GroupByBuilder().setIsOrderPreserving(true).setIsUngroupedAggregate(true)) {
             @Override
-            public GroupBy compile(StatementContext context, TupleProjector tupleProjector) throws SQLException {
+            public GroupBy compile(StatementContext context, QueryPlan innerQueryPlan, Expression whereExpression) throws SQLException {
                 return this;
             }
 
@@ -103,6 +104,7 @@ public class GroupByCompiler {
             this.isOrderPreserving = builder.isOrderPreserving;
             this.orderPreservingColumnCount = builder.orderPreservingColumnCount;
             this.isUngroupedAggregate = builder.isUngroupedAggregate;
+            this.orderPreservingTrackInfos = builder.orderPreservingTrackInfos;
         }
         
         public List<Expression> getExpressions() {
@@ -138,8 +140,12 @@ public class GroupByCompiler {
         public int getOrderPreservingColumnCount() {
             return orderPreservingColumnCount;
         }
-        
-        public GroupBy compile(StatementContext context, TupleProjector tupleProjector) throws SQLException {
+
+        public List<Info> getOrderPreservingTrackInfos() {
+            return orderPreservingTrackInfos;
+        }
+
+        public GroupBy compile(StatementContext context, QueryPlan innerQueryPlan, Expression whereExpression) throws SQLException {
             boolean isOrderPreserving = this.isOrderPreserving;
             int orderPreservingColumnCount = 0;
             if (isOrderPreserving) {
@@ -148,8 +154,9 @@ public class GroupByCompiler {
                         GroupBy.EMPTY_GROUP_BY,
                         Ordering.UNORDERED,
                         expressions.size(),
-                        tupleProjector,
-                        null);
+                        null,
+                        innerQueryPlan,
+                        whereExpression);
                 for (int i = 0; i < expressions.size(); i++) {
                     Expression expression = expressions.get(i);
                     tracker.track(expression);
@@ -162,13 +169,15 @@ public class GroupByCompiler {
                 orderPreservingColumnCount = tracker.getOrderPreservingColumnCount();
                 if(isOrderPreserving) {
                     //reorder the groupby expressions following pk columns
-                    List<Expression> newExpressions = tracker.getExpressionsFromOrderPreservingTrackInfos();
+                    List<Info> orderPreservingTrackInfos = tracker.getOrderPreservingTrackInfos();
+                    List<Expression> newExpressions = Info.extractExpressions(orderPreservingTrackInfos);
                     assert newExpressions.size() == expressions.size();
                     return new GroupBy.GroupByBuilder(this)
                                .setIsOrderPreserving(isOrderPreserving)
                                .setOrderPreservingColumnCount(orderPreservingColumnCount)
                                .setExpressions(newExpressions)
                                .setKeyExpressions(newExpressions)
+                               .setOrderPreservingTrackInfos(orderPreservingTrackInfos)
                                .build();
                 }
             }
@@ -277,6 +286,7 @@ public class GroupByCompiler {
             private List<Expression> expressions = Collections.emptyList();
             private List<Expression> keyExpressions = Collections.emptyList();
             private boolean isUngroupedAggregate;
+            private List<Info> orderPreservingTrackInfos = Collections.emptyList();
 
             public GroupByBuilder() {
             }
@@ -314,6 +324,11 @@ public class GroupByCompiler {
                 return this;
             }
 
+            public GroupByBuilder setOrderPreservingTrackInfos(List<Info> orderPreservingTrackInfos) {
+                this.orderPreservingTrackInfos = orderPreservingTrackInfos;
+                return this;
+            }
+
             public GroupBy build() {
                 return new GroupBy(this);
             }
@@ -338,7 +353,7 @@ public class GroupByCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
-    public static GroupBy compile(StatementContext context, SelectStatement statement, boolean isOrderPreserving) throws SQLException {
+    public static GroupBy compile(StatementContext context, SelectStatement statement) throws SQLException {
         List<ParseNode> groupByNodes = statement.getGroupBy();
         /**
          * Distinct can use an aggregate plan if there's no group by.
@@ -399,7 +414,7 @@ public class GroupByCompiler {
             return GroupBy.EMPTY_GROUP_BY;
         }
         GroupBy groupBy = new GroupBy.GroupByBuilder()
-                .setIsOrderPreserving(isOrderPreserving)
+                .setIsOrderPreserving(OrderByCompiler.isTrackOrderByPreserving(statement))
                 .setExpressions(expressions).setKeyExpressions(expressions)
                 .setIsUngroupedAggregate(isUngroupedAggregate).build();
         return groupBy;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 5d7ace3..d81e6b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -281,4 +281,9 @@ public class ListJarsQueryPlan implements QueryPlan {
     public Long getEstimateInfoTimestamp() throws SQLException {
         return 0l;
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return Collections.<OrderBy> emptyList();
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 3c3f429..712663c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.compile;
 
 
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -27,9 +28,9 @@ import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderPreservingTracker.Ordering;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.LiteralParseNode;
 import org.apache.phoenix.parse.OrderByNode;
@@ -37,9 +38,7 @@ import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.types.PInteger;
 
 import com.google.common.collect.ImmutableList;
@@ -64,13 +63,46 @@ public class OrderByCompiler {
         
         private final List<OrderByExpression> orderByExpressions;
         
-        private OrderBy(List<OrderByExpression> orderByExpressions) {
+        public OrderBy(List<OrderByExpression> orderByExpressions) {
             this.orderByExpressions = ImmutableList.copyOf(orderByExpressions);
         }
 
         public List<OrderByExpression> getOrderByExpressions() {
             return orderByExpressions;
         }
+
+        public boolean isEmpty() {
+            return this.orderByExpressions == null || this.orderByExpressions.isEmpty();
+        }
+
+        public static List<OrderBy> wrapForOutputOrderBys(OrderBy orderBy) {
+            assert orderBy != OrderBy.FWD_ROW_KEY_ORDER_BY && orderBy != OrderBy.REV_ROW_KEY_ORDER_BY;
+            if(orderBy == null || orderBy == OrderBy.EMPTY_ORDER_BY) {
+                return Collections.<OrderBy> emptyList();
+            }
+            return Collections.<OrderBy> singletonList(orderBy);
+        }
+
+        /**
+         * When we compile {@link OrderByNode} in {@link OrderByCompiler#compile}, we invoke {@link OrderByExpression#createByCheckIfExpressionSortOrderDesc}
+         * to get the compiled {@link OrderByExpression} for using it in {@link OrderedResultIterator}, but for {@link QueryPlan#getOutputOrderBys()},
+         * the returned {@link OrderByExpression} is used for {@link OrderPreservingTracker}, so we should invoke {@link OrderByExpression#createByCheckIfExpressionSortOrderDesc}
+         * again to the actual {@link OrderByExpression}.
+         * @return
+         */
+        public static OrderBy convertCompiledOrderByToOutputOrderBy(OrderBy orderBy) {
+            if(orderBy.isEmpty()) {
+                return orderBy;
+            }
+            List<OrderByExpression> orderByExpressions = orderBy.getOrderByExpressions();
+            List<OrderByExpression> newOrderByExpressions = new ArrayList<OrderByExpression>(orderByExpressions.size());
+            for(OrderByExpression orderByExpression : orderByExpressions) {
+                OrderByExpression newOrderByExpression =
+                        OrderByExpression.convertIfExpressionSortOrderDesc(orderByExpression);
+                newOrderByExpressions.add(newOrderByExpression);
+            }
+            return new OrderBy(newOrderByExpressions);
+        }
     }
     /**
      * Gets a list of columns in the ORDER BY clause
@@ -84,11 +116,11 @@ public class OrderByCompiler {
      */
     public static OrderBy compile(StatementContext context,
                                   SelectStatement statement,
-                                  GroupBy groupBy, Integer limit,
+                                  GroupBy groupBy,
+                                  Integer limit,
                                   Integer offset,
                                   RowProjector rowProjector,
-                                  TupleProjector tupleProjector,
-                                  boolean isInRowKeyOrder,
+                                  QueryPlan innerQueryPlan,
                                   Expression whereExpression) throws SQLException {
         List<OrderByNode> orderByNodes = statement.getOrderBy();
         if (orderByNodes.isEmpty()) {
@@ -97,31 +129,23 @@ public class OrderByCompiler {
         // for ungroupedAggregates as GROUP BY expression, check against an empty group by
         ExpressionCompiler compiler;
         if (groupBy.isUngroupedAggregate()) {
-            compiler = new ExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY) {
-                @Override
-                protected Expression addExpression(Expression expression) {return expression;}
-                @Override
-                protected void addColumn(PColumn column) {}
-            };
+            compiler = new StatelessExpressionCompiler(context, GroupBy.EMPTY_GROUP_BY);
         } else {
             compiler = new ExpressionCompiler(context, groupBy);
         }
-
-        if(groupBy != GroupBy.EMPTY_GROUP_BY) {
-            //if there is groupBy,the groupBy.expressions are viewed as new rowKey columns,so
-            //tupleProjector and isInRowKeyOrder is cleared
-            tupleProjector = null;
-            isInRowKeyOrder = true;
+        OrderPreservingTracker tracker = null;
+        if(isTrackOrderByPreserving(statement)) {
+            // accumulate columns in ORDER BY
+            tracker = new OrderPreservingTracker(
+                            context,
+                            groupBy,
+                            Ordering.ORDERED,
+                            orderByNodes.size(),
+                            null,
+                            innerQueryPlan,
+                            whereExpression);
         }
-        // accumulate columns in ORDER BY
-        OrderPreservingTracker tracker = 
-                new OrderPreservingTracker(
-                        context,
-                        groupBy,
-                        Ordering.ORDERED,
-                        orderByNodes.size(),
-                        tupleProjector,
-                        whereExpression);
+
         LinkedHashSet<OrderByExpression> orderByExpressions = Sets.newLinkedHashSetWithExpectedSize(orderByNodes.size());
         for (OrderByNode node : orderByNodes) {
             ParseNode parseNode = node.getNode();
@@ -151,13 +175,19 @@ public class OrderByCompiler {
             if (!expression.isStateless()) {
                 boolean isAscending = node.isAscending();
                 boolean isNullsLast = node.isNullsLast();
-                tracker.track(expression, isAscending ? SortOrder.ASC : SortOrder.DESC, isNullsLast);
-                // If we have a schema where column A is DESC, reverse the sort order and nulls last
-                // since this is the order they actually are in.
-                if (expression.getSortOrder() == SortOrder.DESC) {
-                    isAscending = !isAscending;
+                if(tracker != null) {
+                    tracker.track(expression, isAscending, isNullsLast);
                 }
-                OrderByExpression orderByExpression = new OrderByExpression(expression, isNullsLast, isAscending);
+                /**
+                 * If we have a schema where column A is DESC, reverse the sort order
+                 * since this is the order they actually are in.
+                 * Reverse is required because the compiled OrderByExpression is used in {@link OrderedResultIterator},
+                 * {@link OrderedResultIterator} implements the compare based on binary representation, not the decoded value of corresponding dataType.
+                 */
+                OrderByExpression orderByExpression = OrderByExpression.createByCheckIfExpressionSortOrderDesc(
+                        expression,
+                        isNullsLast,
+                        isAscending);
                 orderByExpressions.add(orderByExpression);
             }
             compiler.reset();
@@ -167,7 +197,7 @@ public class OrderByCompiler {
             return OrderBy.EMPTY_ORDER_BY;
         }
         // If we're ordering by the order returned by the scan, we don't need an order by
-        if (isInRowKeyOrder && tracker.isOrderPreserving()) {
+        if (tracker != null && tracker.isOrderPreserving()) {
             if (tracker.isReverse()) {
                 // Don't use reverse scan if:
                 // 1) we're using a skip scan, as our skip scan doesn't support this yet.
@@ -189,6 +219,10 @@ public class OrderByCompiler {
         return new OrderBy(Lists.newArrayList(orderByExpressions.iterator()));
     }
 
+    public static boolean isTrackOrderByPreserving(SelectStatement selectStatement) {
+        return !selectStatement.isUnion();
+    }
+
     private OrderByCompiler() {
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderPreservingTracker.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderPreservingTracker.java
index 29b3794..0435148 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderPreservingTracker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderPreservingTracker.java
@@ -9,18 +9,25 @@
  */
 package org.apache.phoenix.compile;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.RowValueConstructorExpression;
@@ -28,14 +35,17 @@ import org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving
 import org.apache.phoenix.expression.function.ScalarFunction;
 import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.ExpressionUtil;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 /**
+ * <pre>
  * Determines if the natural key order of the rows returned by the scan
  * will match the order of the expressions. For GROUP BY, if order is preserved we can use
  * an optimization during server-side aggregation to do the aggregation on-the-fly versus
@@ -43,203 +53,390 @@ import com.google.common.collect.Lists;
  * for each group will be contiguous. For ORDER BY, we can drop the ORDER BY statement if
  * the order is preserved.
  * 
+ * There are mainly three changes for refactoring this class in PHOENIX-5148:
+ * 1.added a {@link #getInputOrderBys} method to determine the input OrderBys by the combination
+ *   of innerQueryPlan's output OrderBys , GroupBy of current QueryPlan and the rowKeyColumns of current table.
+ *
+ * 2.because the innerQueryPlan may have multiple output OrderBys(for {@link SortMergeJoinPlan}),
+ *   so I extracted many stateful member variables (such as orderPreservingTrackInfos, isOrderPreserving
+ *   and isReverse etc.) to a new inner class {@link TrackOrderByContext}, {@link TrackOrderByContext}
+ *   is used to track if a single Input OrderBy matches the target OrderByExpressions in {@link #isOrderPreserving()}
+ *   method, and once we found a  {@link TrackOrderByContext} satisfied {@link #isOrderPreserving()},
+ *   {@link #selectedTrackOrderByContext} member variable is set to this {@link TrackOrderByContext},
+ *   then we can use this {@link #selectedTrackOrderByContext} to implement the {@link #getOrderPreservingTrackInfos()}
+ *   and {@link #isReverse()} methods etc.
+ *   BTW. at most only one {@link TrackOrderByContext} can meet {@link #isOrderPreserving()} is true.
+ *
+ * 3.added ascending and nullsLast to the inner class {@link Info} , and extracted complete ordering
+ *   information in {@link Info} class by the inner {@link TrackOrderPreservingExpressionVisitor} class,
+ *   so we can inferring alignment between the target OrderByExpressions and the input OrderBys based on
+ *   {@link Info} in {@link TrackOrderByContext#doTrack} method, not on the row keys like the original
+ *   {@link #track} method does.
+ * </pre>
  */
 public class OrderPreservingTracker {
     public enum Ordering {ORDERED, UNORDERED};
 
     public static class Info {
-        public final OrderPreserving orderPreserving;
-        public final int pkPosition;
-        public final int slotSpan;
-        public Expression expression;
+        private final OrderPreserving orderPreserving;
+        private final int pkPosition;
+        private final int slotSpan;
+        private final boolean ascending;
+        private final boolean nullsLast;
+        private Expression expression;
 
-        public Info(int pkPosition) {
+        public Info(int pkPosition, boolean ascending, boolean nullsLast) {
             this.pkPosition = pkPosition;
             this.orderPreserving = OrderPreserving.YES;
             this.slotSpan = 1;
+            this.ascending = ascending;
+            this.nullsLast = nullsLast;
         }
 
-        public Info(Info info, OrderPreserving orderPreserving) {
-            this.pkPosition = info.pkPosition;
-            this.slotSpan = info.slotSpan;
+        public Info(int rowKeyColumnPosition, int rowKeySlotSpan, OrderPreserving orderPreserving, boolean ascending, boolean nullsLast) {
+            this.pkPosition = rowKeyColumnPosition;
+            this.slotSpan = rowKeySlotSpan;
             this.orderPreserving = orderPreserving;
+            this.ascending = ascending;
+            this.nullsLast = nullsLast;
         }
 
-        public Info(Info info, int slotSpan, OrderPreserving orderPreserving) {
-            this.pkPosition = info.pkPosition;
-            this.slotSpan = slotSpan;
-            this.orderPreserving = orderPreserving;
+        public static List<Expression> extractExpressions(List<Info> orderPreservingTrackInfos) {
+            List<Expression> newExpressions = new ArrayList<Expression>(orderPreservingTrackInfos.size());
+            for(Info trackInfo : orderPreservingTrackInfos) {
+                newExpressions.add(trackInfo.expression);
+            }
+            return newExpressions;
+        }
+
+        public Expression getExpression() {
+            return expression;
+        }
+
+        public boolean isAscending() {
+            return ascending;
+        }
+
+        public boolean isNullsLast() {
+            return nullsLast;
         }
     }
     private final StatementContext context;
-    private final TrackOrderPreservingExpressionVisitor visitor;
     private final GroupBy groupBy;
     private final Ordering ordering;
-    private final int pkPositionOffset;
-    private final List<Info> orderPreservingInfos;
-    private boolean isOrderPreserving = true;
-    private Boolean isReverse = null;
-    private int orderPreservingColumnCount = 0;
+    private int pkPositionOffset = 0;
     private Expression whereExpression;
-    
-    public OrderPreservingTracker(StatementContext context, GroupBy groupBy, Ordering ordering, int nNodes) {
-        this(context, groupBy, ordering, nNodes, null, null);
+    private List<TrackOrderByCell> trackOrderByCells = new LinkedList<TrackOrderByCell>();
+    private List<TrackOrderByContext> trackOrderByContexts = Collections.<TrackOrderByContext> emptyList();
+    private TrackOrderByContext selectedTrackOrderByContext = null;
+    private List<OrderBy> inputOrderBys = Collections.<OrderBy> emptyList();
+
+    public OrderPreservingTracker(StatementContext context, GroupBy groupBy, Ordering ordering, int nNodes) throws SQLException {
+        this(context, groupBy, ordering, nNodes, null, null, null);
     }
-    
+
     public OrderPreservingTracker(
             StatementContext context,
             GroupBy groupBy,
             Ordering ordering,
             int nNodes,
-            TupleProjector projector,
-            Expression whereExpression) {
+            List<OrderBy> inputOrderBys,
+            QueryPlan innerQueryPlan,
+            Expression whereExpression) throws SQLException {
+
         this.context = context;
-        if (groupBy.isEmpty()) {
+        boolean isOrderPreserving = false;
+        if (groupBy.isEmpty() && inputOrderBys == null) {
             PTable table = context.getResolver().getTables().get(0).getTable();
-            this.isOrderPreserving = table.rowKeyOrderOptimizable();
-            boolean isSalted = table.getBucketNum() != null;
-            boolean isMultiTenant = context.getConnection().getTenantId() != null && table.isMultiTenant();
-            boolean isSharedViewIndex = table.getViewIndexId() != null;
-            // TODO: util for this offset, as it's computed in numerous places
-            this.pkPositionOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+            isOrderPreserving = table.rowKeyOrderOptimizable();
         } else {
-            this.isOrderPreserving = true;
-            this.pkPositionOffset = 0;
+            isOrderPreserving = true;
         }
         this.groupBy = groupBy;
-        this.visitor = new TrackOrderPreservingExpressionVisitor(projector);
-        this.orderPreservingInfos = Lists.newArrayListWithExpectedSize(nNodes);
         this.ordering = ordering;
         this.whereExpression = whereExpression;
+        if(inputOrderBys != null) {
+            this.inputOrderBys = inputOrderBys;
+        } else {
+            this.getInputOrderBys(innerQueryPlan, groupBy, context);
+        }
+        if(this.inputOrderBys.isEmpty()) {
+            return;
+        }
+        this.trackOrderByContexts = new ArrayList<TrackOrderByContext>(this.inputOrderBys.size());
+        for(OrderBy inputOrderBy : this.inputOrderBys) {
+            this.trackOrderByContexts.add(
+                    new TrackOrderByContext(isOrderPreserving, nNodes, inputOrderBy));
+        }
     }
-    
-    public void track(Expression node) {
-        SortOrder sortOrder = node.getSortOrder();
-        track(node, sortOrder, null);
+
+    /**
+     * Infer input OrderBys, if the innerQueryPlan is null, we make the OrderBys from the pk columns of {@link PTable}.
+     * @param innerQueryPlan
+     * @param groupBy
+     * @param statementContext
+     * @throws SQLException
+     */
+    private void getInputOrderBys(QueryPlan innerQueryPlan, GroupBy groupBy, StatementContext statementContext) throws SQLException {
+        if(!groupBy.isEmpty()) {
+            this.inputOrderBys = Collections.singletonList(ExpressionUtil.convertGroupByToOrderBy(groupBy, false));
+            return;
+        }
+        if(innerQueryPlan != null) {
+            this.inputOrderBys = innerQueryPlan.getOutputOrderBys();
+            return;
+        }
+        this.inputOrderBys = Collections.<OrderBy> emptyList();
+        TableRef tableRef = statementContext.getResolver().getTables().get(0);
+        PhoenixConnection phoenixConnection = statementContext.getConnection();
+        Pair<OrderBy,Integer> orderByAndRowKeyColumnOffset =
+                ExpressionUtil.getOrderByFromTable(tableRef, phoenixConnection, false);
+        OrderBy orderBy = orderByAndRowKeyColumnOffset.getFirst();
+        this.pkPositionOffset = orderByAndRowKeyColumnOffset.getSecond();
+        if(orderBy != OrderBy.EMPTY_ORDER_BY) {
+            this.inputOrderBys = Collections.singletonList(orderBy);
+        }
     }
-    
-    public void track(Expression node, SortOrder sortOrder, Boolean isNullsLast) {
-        if (isOrderPreserving) {
-            Info info = node.accept(visitor);
-            if (info == null) {
+
+    private class TrackOrderByContext {
+        private List<Info> orderPreservingTrackInfos;
+        private boolean isOrderPreserving = true;
+        private Boolean isReverse = null;
+        private int orderPreservingColumnCount = 0;
+        private final TrackOrderPreservingExpressionVisitor trackOrderPreservingExpressionVisitor;
+        private OrderBy inputOrderBy = null;
+
+        public TrackOrderByContext(boolean isOrderPreserving, int orderByNodeCount, OrderBy inputOrderBy) {
+            this.isOrderPreserving = isOrderPreserving;
+            this.trackOrderPreservingExpressionVisitor = new TrackOrderPreservingExpressionVisitor(inputOrderBy);
+            this.orderPreservingTrackInfos = Lists.newArrayListWithExpectedSize(orderByNodeCount);
+            this.inputOrderBy = inputOrderBy;
+        }
+
+        public void track(List<TrackOrderByCell> trackOrderByCells) {
+            for(TrackOrderByCell trackOrderByCell : trackOrderByCells) {
+                doTrack(trackOrderByCell.expression,
+                        trackOrderByCell.isAscending,
+                        trackOrderByCell.isNullsLast);
+            }
+        }
+
+        private void doTrack(Expression expression, Boolean isAscending, Boolean isNullsLast) {
+            if (!isOrderPreserving) {
+               return;
+            }
+            Info trackInfo = expression.accept(trackOrderPreservingExpressionVisitor);
+            if (trackInfo == null) {
                 isOrderPreserving = false;
+                return;
+            }
+            // If the expression is sorted in a different order than the specified sort order
+            // then the expressions are not order preserving.
+            if (isAscending != null && trackInfo.ascending != isAscending.booleanValue()) {
+                if (isReverse == null) {
+                    isReverse = true;
+                } else if (!isReverse){
+                    isOrderPreserving = false;
+                    isReverse = false;
+                    return;
+                }
             } else {
-                // If the expression is sorted in a different order than the specified sort order
-                // then the expressions are not order preserving.
-                if (node.getSortOrder() != sortOrder) {
-                    if (isReverse == null) {
-                        isReverse = true;
-                    } else if (!isReverse){
-                        isOrderPreserving = false;
-                        isReverse = false;
-                        return;
-                    }
-                } else {
-                    if (isReverse == null) {
-                        isReverse = false;
-                    } else if (isReverse){
-                        isOrderPreserving = false;
-                        isReverse = false;
-                        return;
-                    }
+                if (isReverse == null) {
+                    isReverse = false;
+                } else if (isReverse){
+                    isOrderPreserving = false;
+                    isReverse = false;
+                    return;
                 }
-                if (isNullsLast!=null && node.isNullable()) {
-                    if (!Boolean.valueOf(isNullsLast).equals(isReverse)) {
-                        isOrderPreserving = false;
-                        isReverse = false;
-                        return;
+            }
+
+            if (isNullsLast!=null && expression.isNullable()) {
+                if ((trackInfo.nullsLast == isNullsLast.booleanValue()) && isReverse.booleanValue() ||
+                    (trackInfo.nullsLast != isNullsLast.booleanValue()) && !isReverse.booleanValue()) {
+                    isOrderPreserving = false;
+                    isReverse = false;
+                    return;
+                }
+            }
+            trackInfo.expression = expression;
+            orderPreservingTrackInfos.add(trackInfo);
+        }
+
+        /*
+         * Only valid AFTER call to isOrderPreserving
+         */
+        public int getOrderPreservingColumnCount() {
+            return orderPreservingColumnCount;
+        }
+
+        /**
+         * Only valid AFTER call to isOrderPreserving
+         */
+        public List<Info> getOrderPreservingTrackInfos() {
+            if(this.isOrderPreserving) {
+                return ImmutableList.copyOf(this.orderPreservingTrackInfos);
+            }
+            int orderPreservingColumnCountToUse = this.orderPreservingColumnCount - pkPositionOffset;
+            if(orderPreservingColumnCountToUse <= 0) {
+                return Collections.<Info> emptyList();
+            }
+            return ImmutableList.copyOf(this.orderPreservingTrackInfos.subList(0, orderPreservingColumnCountToUse));
+        }
+
+        public boolean isOrderPreserving() {
+            if (!isOrderPreserving) {
+                return false;
+            }
+            if (ordering == Ordering.UNORDERED) {
+                // Sort by position
+                Collections.sort(orderPreservingTrackInfos, new Comparator<Info>() {
+                    @Override
+                    public int compare(Info o1, Info o2) {
+                        int cmp = o1.pkPosition-o2.pkPosition;
+                        if (cmp != 0) return cmp;
+                        // After pk position, sort on reverse OrderPreserving ordinal: NO, YES_IF_LAST, YES
+                        // In this way, if we have an ORDER BY over a YES_IF_LAST followed by a YES, we'll
+                        // allow subsequent columns to be ordered.
+                        return o2.orderPreserving.ordinal() - o1.orderPreserving.ordinal();
                     }
+                });
+            }
+            // Determine if there are any gaps in the PK columns (in which case we don't need
+            // to sort in the coprocessor because the keys will already naturally be in sorted
+            // order.
+            int prevSlotSpan = 1;
+            int prevPos =  -1;
+            OrderPreserving prevOrderPreserving = OrderPreserving.YES;
+            for (int i = 0; i < orderPreservingTrackInfos.size(); i++) {
+                Info entry = orderPreservingTrackInfos.get(i);
+                int pos = entry.pkPosition;
+                isOrderPreserving &= entry.orderPreserving != OrderPreserving.NO &&
+                        prevOrderPreserving == OrderPreserving.YES &&
+                        (pos == prevPos ||
+                         pos - prevSlotSpan == prevPos  ||
+                         hasEqualityConstraints(prevPos+prevSlotSpan, pos));
+                if(!isOrderPreserving) {
+                    break;
+                }
+                prevPos = pos;
+                prevSlotSpan = entry.slotSpan;
+                prevOrderPreserving = entry.orderPreserving;
+            }
+            orderPreservingColumnCount = prevPos + prevSlotSpan + pkPositionOffset;
+            return isOrderPreserving;
+        }
+
+        private boolean hasEqualityConstraints(int startPos, int endPos) {
+            ScanRanges ranges = context.getScanRanges();
+            // If a GROUP BY is being done, then the rows are ordered according to the GROUP BY key,
+            // not by the original row key order of the table (see PHOENIX-3451).
+            // We check each GROUP BY expression to see if it only references columns that are
+            // matched by equality constraints, in which case the expression itself would be constant.
+            for (int pos = startPos; pos < endPos; pos++) {
+                Expression expressionToCheckConstant = this.getExpressionToCheckConstant(pos);
+                IsConstantVisitor visitor = new IsConstantVisitor(ranges, whereExpression);
+                Boolean isConstant = expressionToCheckConstant.accept(visitor);
+                if (!Boolean.TRUE.equals(isConstant)) {
+                    return false;
                 }
-                info.expression = node;
-                orderPreservingInfos.add(info);
             }
+            return true;
+        }
+
+        public boolean isReverse() {
+            return Boolean.TRUE.equals(isReverse);
+        }
+
+        private Expression getExpressionToCheckConstant(int columnIndex) {
+            if (!groupBy.isEmpty()) {
+                List<Expression> groupByExpressions = groupBy.getExpressions();
+                assert columnIndex < groupByExpressions.size();
+                return  groupByExpressions.get(columnIndex);
+            }
+
+            assert columnIndex < inputOrderBy.getOrderByExpressions().size();
+            return inputOrderBy.getOrderByExpressions().get(columnIndex).getExpression();
         }
     }
-    
+
+    private static class TrackOrderByCell
+    {
+        private Expression expression;
+        private Boolean isAscending;
+        private Boolean isNullsLast;
+
+        public TrackOrderByCell(Expression expression,Boolean isAscending, Boolean isNullsLast) {
+            this.expression = expression;
+            this.isAscending = isAscending;
+            this.isNullsLast = isNullsLast;
+        }
+    }
+
+    public void track(Expression expression) {
+        track(expression, null, null);
+    }
+
+    public void track(Expression expression, Boolean isAscending, Boolean isNullsLast) {
+        TrackOrderByCell trackOrderByContext =
+                new TrackOrderByCell(expression, isAscending, isNullsLast);
+        this.trackOrderByCells.add(trackOrderByContext);
+    }
+
     /*
      * Only valid AFTER call to isOrderPreserving
      */
     public int getOrderPreservingColumnCount() {
-        return orderPreservingColumnCount;
+        if(this.selectedTrackOrderByContext == null) {
+            return 0;
+        }
+        return this.selectedTrackOrderByContext.getOrderPreservingColumnCount();
     }
 
-    public List<Expression> getExpressionsFromOrderPreservingTrackInfos() {
-        assert isOrderPreserving;
-        assert (this.orderPreservingInfos != null && this.orderPreservingInfos.size() > 0);
-        List<Expression> newExpressions = new ArrayList<Expression>(this.orderPreservingInfos.size());
-        for(Info trackInfo : this.orderPreservingInfos) {
-            newExpressions.add(trackInfo.expression);
+    /**
+     * Only valid AFTER call to isOrderPreserving
+     */
+    public List<Info> getOrderPreservingTrackInfos() {
+        if(this.selectedTrackOrderByContext == null) {
+            return Collections.<Info> emptyList();
         }
-        return newExpressions;
+        return this.selectedTrackOrderByContext.getOrderPreservingTrackInfos();
     }
 
     public boolean isOrderPreserving() {
-        if (!isOrderPreserving) {
+        if(this.selectedTrackOrderByContext != null) {
+            throw new IllegalStateException("isOrderPreserving should be called only once");
+        }
+
+        if(this.trackOrderByContexts.isEmpty()) {
+           return false;
+        }
+
+        if(this.trackOrderByCells.isEmpty()) {
             return false;
         }
-        if (ordering == Ordering.UNORDERED) {
-            // Sort by position
-            Collections.sort(orderPreservingInfos, new Comparator<Info>() {
-                @Override
-                public int compare(Info o1, Info o2) {
-                    int cmp = o1.pkPosition-o2.pkPosition;
-                    if (cmp != 0) return cmp;
-                    // After pk position, sort on reverse OrderPreserving ordinal: NO, YES_IF_LAST, YES
-                    // In this way, if we have an ORDER BY over a YES_IF_LAST followed by a YES, we'll
-                    // allow subsequent columns to be ordered.
-                    return o2.orderPreserving.ordinal() - o1.orderPreserving.ordinal();
-                }
-            });
-        }
-        // Determine if there are any gaps in the PK columns (in which case we don't need
-        // to sort in the coprocessor because the keys will already naturally be in sorted
-        // order.
-        int prevSlotSpan = 1;
-        int prevPos = pkPositionOffset - 1;
-        OrderPreserving prevOrderPreserving = OrderPreserving.YES;
-        for (int i = 0; i < orderPreservingInfos.size() && isOrderPreserving; i++) {
-            Info entry = orderPreservingInfos.get(i);
-            int pos = entry.pkPosition;
-            isOrderPreserving &= entry.orderPreserving != OrderPreserving.NO && prevOrderPreserving == OrderPreserving.YES && (pos == prevPos || pos - prevSlotSpan == prevPos  || hasEqualityConstraints(prevPos+prevSlotSpan, pos));
-            prevPos = pos;
-            prevSlotSpan = entry.slotSpan;
-            prevOrderPreserving = entry.orderPreserving;
-        }
-        orderPreservingColumnCount = prevPos + prevSlotSpan;
-        return isOrderPreserving;
-    }
-    
-    private boolean hasEqualityConstraints(int startPos, int endPos) {
-        ScanRanges ranges = context.getScanRanges();
-        // If a GROUP BY is being done, then the rows are ordered according to the GROUP BY key,
-        // not by the original row key order of the table (see PHOENIX-3451).
-        // We check each GROUP BY expression to see if it only references columns that are
-        // matched by equality constraints, in which case the expression itself would be constant.
-        if (!groupBy.isEmpty()) {
-            for (int pos = startPos; pos < endPos; pos++) {
-                IsConstantVisitor visitor = new IsConstantVisitor(ranges, whereExpression);
-                List<Expression> groupByExpressions = groupBy.getExpressions();
-                if (pos >= groupByExpressions.size()) { // sanity check - shouldn't be necessary
-                    return false;
-                }
-                Expression groupByExpression = groupByExpressions.get(pos);
-                Boolean isConstant = groupByExpression.accept(visitor);
-                if (!Boolean.TRUE.equals(isConstant)) {
-                    return false;
-                }
+
+        /**
+         * at most only one TrackOrderByContext can meet isOrderPreserving is true
+         */
+        for(TrackOrderByContext trackOrderByContext : this.trackOrderByContexts) {
+            trackOrderByContext.track(trackOrderByCells);
+            if(trackOrderByContext.isOrderPreserving()) {
+               this.selectedTrackOrderByContext = trackOrderByContext;
+               break;
             }
-            return true;
-        }
-        for (int pos = startPos; pos < endPos; pos++) {
-            if (!ranges.hasEqualityConstraint(pos)) {
-                return false;
+
+            if(this.selectedTrackOrderByContext == null) {
+                this.selectedTrackOrderByContext = trackOrderByContext;
             }
         }
-        return true;
+        return this.selectedTrackOrderByContext.isOrderPreserving;
     }
-    
+
     public boolean isReverse() {
-        return Boolean.TRUE.equals(isReverse);
+        if(this.selectedTrackOrderByContext == null) {
+            throw new IllegalStateException("isReverse should only be called when isOrderPreserving is true!");
+        }
+        return this.selectedTrackOrderByContext.isReverse();
     }
 
     /**
@@ -296,28 +493,53 @@ public class OrderPreservingTracker {
      *
      */
     private static class TrackOrderPreservingExpressionVisitor extends StatelessTraverseNoExpressionVisitor<Info> {
-        private final TupleProjector projector;
-        
-        public TrackOrderPreservingExpressionVisitor(TupleProjector projector) {
-            this.projector = projector;
+        private Map<Expression, Pair<Integer,OrderByExpression>> expressionToPositionAndOrderByExpression;
+
+        public TrackOrderPreservingExpressionVisitor(OrderBy orderBy) {
+            if(orderBy.isEmpty()) {
+                this.expressionToPositionAndOrderByExpression = Collections.<Expression, Pair<Integer,OrderByExpression>> emptyMap();
+                return;
+            }
+            List<OrderByExpression> orderByExpressions = orderBy.getOrderByExpressions();
+            this.expressionToPositionAndOrderByExpression = new HashMap<Expression, Pair<Integer,OrderByExpression>>(orderByExpressions.size());
+            int index = 0;
+            for(OrderByExpression orderByExpression : orderByExpressions) {
+                this.expressionToPositionAndOrderByExpression.put(
+                        orderByExpression.getExpression(),
+                        new Pair<Integer,OrderByExpression>(index++, orderByExpression));
+            }
         }
-        
+
         @Override
-        public Info visit(RowKeyColumnExpression node) {
-            return new Info(node.getPosition());
+        public Info defaultReturn(Expression expression, List<Info> childInfos) {
+            return match(expression);
         }
 
         @Override
-        public Info visit(ProjectedColumnExpression node) {
-            if (projector == null) {
-                return super.visit(node);
-            }
-            Expression expression = projector.getExpressions()[node.getPosition()];
-            // Only look one level down the projection.
-            if (expression instanceof ProjectedColumnExpression) {
-                return super.visit(node);
+        public Info visit(RowKeyColumnExpression rowKeyColumnExpression) {
+            return match(rowKeyColumnExpression);
+        }
+
+        @Override
+        public Info visit(KeyValueColumnExpression keyValueColumnExpression) {
+            return match(keyValueColumnExpression);
+        }
+
+        @Override
+        public Info visit(ProjectedColumnExpression projectedColumnExpression) {
+            return match(projectedColumnExpression);
+        }
+
+        private Info match(Expression expression)
+        {
+            Pair<Integer,OrderByExpression> positionAndOrderByExpression = this.expressionToPositionAndOrderByExpression.get(expression);
+            if(positionAndOrderByExpression == null) {
+                return null;
             }
-            return expression.accept(this);
+            return new Info(
+                    positionAndOrderByExpression.getFirst(),
+                    positionAndOrderByExpression.getSecond().isAscending(),
+                    positionAndOrderByExpression.getSecond().isNullsLast());
         }
 
         @Override
@@ -333,10 +555,18 @@ public class OrderPreservingTracker {
             // Keep the minimum value between this function and the current value,
             // so that we never increase OrderPreserving from NO or YES_IF_LAST.
             OrderPreserving orderPreserving = OrderPreserving.values()[Math.min(node.preservesOrder().ordinal(), info.orderPreserving.ordinal())];
-            if (orderPreserving == info.orderPreserving) {
+            Expression childExpression = node.getChildren().get(
+                    node.getKeyFormationTraversalIndex());
+            boolean sortOrderIsSame = node.getSortOrder() == childExpression.getSortOrder();
+            if (orderPreserving == info.orderPreserving && sortOrderIsSame) {
                 return info;
             }
-            return new Info(info, orderPreserving);
+            return new Info(
+                    info.pkPosition,
+                    info.slotSpan,
+                    orderPreserving,
+                    sortOrderIsSame ? info.ascending : !info.ascending,
+                    info.nullsLast);
         }
 
         @Override
@@ -367,10 +597,18 @@ public class OrderPreservingTracker {
                 if (lastInfo.orderPreserving == OrderPreserving.YES_IF_LAST) { return null; }
                 Info info = l.get(i);
                 // not order preserving since there's a gap in the pk
-                if (info.pkPosition != lastInfo.pkPosition + 1) { return null; }
+                if (info.pkPosition != lastInfo.pkPosition + 1) {
+                    return null;
+                 }
+                if(info.ascending != lastInfo.ascending) {
+                    return null;
+                }
+                if(info.nullsLast != lastInfo.nullsLast) {
+                    return null;
+                }
                 lastInfo = info;
             }
-            return new Info(firstInfo, l.size(), lastInfo.orderPreserving);
+            return new Info(firstInfo.pkPosition, l.size(), lastInfo.orderPreserving, lastInfo.ascending, lastInfo.nullsLast);
         }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 3e68a0b..e7d8e18 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -180,7 +180,13 @@ public class QueryCompiler {
             select.hasWildcard() ? null : select.getSelect());
         ColumnResolver resolver = FromCompiler.getResolver(tableRef);
         StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
-        QueryPlan plan = compileSingleFlatQuery(context, select, statement.getParameters(), false, false, null, null, false);
+        QueryPlan plan = compileSingleFlatQuery(
+                context,
+                select,
+                statement.getParameters(),
+                false,
+                false,
+                null);
         plan = new UnionPlan(context, select, tableRef, plan.getProjector(), plan.getLimit(),
             plan.getOffset(), plan.getOrderBy(), GroupBy.EMPTY_GROUP_BY, plans,
             context.getBindManager().getParameterMetaData());
@@ -222,12 +228,22 @@ public class QueryCompiler {
                         wildcardIncludesDynamicCols);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
                 table.projectColumns(context.getScan());
-                return compileSingleFlatQuery(context, subquery, binds, asSubquery, !asSubquery, null, projectPKColumns ? projector : null, true);
+                return compileSingleFlatQuery(
+                        context,
+                        subquery,
+                        binds,
+                        asSubquery,
+                        !asSubquery,
+                        null);
             }
             QueryPlan plan = compileSubquery(subquery, false);
             PTable projectedTable = table.createProjectedTable(plan.getProjector());
             context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), subquery.getUdfParseNodes()));
-            return new TupleProjectionPlan(plan, new TupleProjector(plan.getProjector()), null);
+            return new TupleProjectionPlan(
+                    plan,
+                    new TupleProjector(plan.getProjector()),
+                    context,
+                    null);
         }
 
         List<JoinCompiler.Strategy> strategies = joinTable.getApplicableJoinStrategies();
@@ -329,7 +345,13 @@ public class QueryCompiler {
                 }
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector,
                         wildcardIncludesDynamicCols);
-                QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                QueryPlan plan = compileSingleFlatQuery(
+                        context,
+                        query,
+                        binds,
+                        asSubquery,
+                        !asSubquery && joinTable.isAllLeftJoin(),
+                        null);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
                 Integer limit = null;
                 Integer offset = null;
@@ -383,7 +405,13 @@ public class QueryCompiler {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector,
                         wildcardIncludesDynamicCols);
                 context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), rhs.getUdfParseNodes()));
-                QueryPlan rhsPlan = compileSingleFlatQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right, null, !rhsTable.isSubselect() && projectPKColumns ? tupleProjector : null, true);
+                QueryPlan rhsPlan = compileSingleFlatQuery(
+                        context,
+                        rhs,
+                        binds,
+                        asSubquery,
+                        !asSubquery && type == JoinType.Right,
+                        null);
                 Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
                 Integer limit = null;
                 Integer offset = null;
@@ -423,7 +451,6 @@ public class QueryCompiler {
                 boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
                 QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
                 PTable lhsProjTable = lhsCtx.getResolver().getTables().get(0).getTable();
-                boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
 
                 Scan rhsScan = ScanUtil.newScan(originalScan);
                 StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
@@ -442,7 +469,21 @@ public class QueryCompiler {
                 TableRef tableRef = resolver.getTables().get(0);
                 StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
                 subCtx.setCurrentTable(tableRef);
-                QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable, lhsProjTable, needsMerge ? rhsProjTable : null, fieldPosition, lastJoinSpec.isSingleValueOnly());
+                QueryPlan innerPlan = new SortMergeJoinPlan(
+                        subCtx,
+                        joinTable.getStatement(),
+                        tableRef,
+                        type == JoinType.Right ? JoinType.Left : type,
+                        lhsPlan,
+                        rhsPlan,
+                        new Pair<List<Expression>,List<Expression>>(lhsKeyExpressions, rhsKeyExpressions),
+                        rhsKeyExpressions,
+                        projectedTable,
+                        lhsProjTable,
+                        needsMerge ? rhsProjTable : null,
+                        fieldPosition,
+                        lastJoinSpec.isSingleValueOnly(),
+                        new Pair<List<OrderByNode>,List<OrderByNode>>(lhsOrderBy, rhsOrderBy));
                 context.setCurrentTable(tableRef);
                 context.setResolver(resolver);
                 TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
@@ -454,7 +495,13 @@ public class QueryCompiler {
                         joinTable.getStatement().getUdfParseNodes())
                         : NODE_FACTORY.select(joinTable.getStatement(), from, where);
 
-                return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
+                return compileSingleFlatQuery(
+                        context,
+                        select,
+                        binds,
+                        asSubquery,
+                        false,
+                        innerPlan);
             }
             default:
                 throw new IllegalArgumentException("Invalid join strategy '" + strategy + "'");
@@ -518,25 +565,31 @@ public class QueryCompiler {
     protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
         SelectStatement innerSelect = select.getInnerSelectStatement();
         if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
+            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null);
         }
 
         QueryPlan innerPlan = compileSubquery(innerSelect, false);
-        TupleProjector tupleProjector = new TupleProjector(innerPlan.getProjector());
-        innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null);
+        RowProjector innerQueryPlanRowProjector = innerPlan.getProjector();
+        TupleProjector tupleProjector = new TupleProjector(innerQueryPlanRowProjector);
 
         // Replace the original resolver and table with those having compiled type info.
         TableRef tableRef = context.getResolver().getTables().get(0);
-        ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerPlan.getProjector());
+        ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerQueryPlanRowProjector);
         context.setResolver(resolver);
         tableRef = resolver.getTables().get(0);
         context.setCurrentTable(tableRef);
-        boolean isInRowKeyOrder = innerPlan.getGroupBy() == GroupBy.EMPTY_GROUP_BY && innerPlan.getOrderBy() == OrderBy.EMPTY_ORDER_BY;
+        innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, context, null);
 
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, isInRowKeyOrder);
+        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan);
     }
 
-    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
+    protected QueryPlan compileSingleFlatQuery(
+            StatementContext context,
+            SelectStatement select,
+            List<Object> binds,
+            boolean asSubquery,
+            boolean allowPageFilter,
+            QueryPlan innerPlan) throws SQLException {
         PTable projectedTable = null;
         if (this.projectTuples) {
             projectedTable = TupleProjectionCompiler.createProjectedTable(select, context);
@@ -556,7 +609,7 @@ public class QueryCompiler {
         Integer limit = LimitCompiler.compile(context, select);
         Integer offset = OffsetCompiler.compile(context, select);
 
-        GroupBy groupBy = GroupByCompiler.compile(context, select, isInRowKeyOrder);
+        GroupBy groupBy = GroupByCompiler.compile(context, select);
         // Optimize the HAVING clause by finding any group by expressions that can be moved
         // to the WHERE clause
         select = HavingCompiler.rewrite(context, select, groupBy);
@@ -570,7 +623,7 @@ public class QueryCompiler {
         Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
         // Recompile GROUP BY now that we've figured out our ScanRanges so we know
         // definitively whether or not we'll traverse in row key order.
-        groupBy = groupBy.compile(context, innerPlanTupleProjector);
+        groupBy = groupBy.compile(context, innerPlan, where);
         context.setResolver(resolver); // recover resolver
         boolean wildcardIncludesDynamicCols = context.getConnection().getQueryServices()
                 .getConfiguration().getBoolean(WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB,
@@ -585,8 +638,7 @@ public class QueryCompiler {
                 limit,
                 offset,
                 projector,
-                innerPlanTupleProjector,
-                isInRowKeyOrder,
+                innerPlan,
                 where);
         context.getAggregationManager().compile(context, groupBy);
         // Final step is to build the query plan
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
index c2edaf3..7732119 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -24,11 +24,13 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
 
@@ -67,7 +69,10 @@ public interface QueryPlan extends StatementPlan {
     Integer getLimit();
 
     Integer getOffset();
-    
+
+    /**
+     * Return the compiled Order By clause of {@link SelectStatement}.
+     */
     OrderBy getOrderBy();
 
     GroupBy getGroupBy();
@@ -92,4 +97,17 @@ public interface QueryPlan extends StatementPlan {
     public boolean useRoundRobinIterator() throws SQLException;
 
     <T> T accept(QueryPlanVisitor<T> visitor);
+
+    /**
+     * <pre>
+     * Get the actual OrderBys of this queryPlan, which may be different from {@link #getOrderBy()},
+     * because {@link #getOrderBy()} is only the compiled result of {@link SelectStatement}.
+     * The return type is List because we can get multiple OrderBys for the query result of {@link SortMergeJoinPlan},
+     * eg. for the sql:
+     * SELECT  * FROM T1 JOIN T2 ON T1.a = T2.a and T1.b = T2.b
+     * The result of the sort-merge-join is sorted on (T1.a, T1.b) and (T2.a, T2.b) at the same time.
+     * </pre>
+     * @return
+     */
+    public List<OrderBy> getOutputOrderBys() ;
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatelessExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatelessExpressionCompiler.java
new file mode 100644
index 0000000..1245532
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatelessExpressionCompiler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.schema.PColumn;
+
+/**
+ * A ExpressionCompiler which does not pollute {@link StatementContext}
+ *
+ */
+public class StatelessExpressionCompiler extends ExpressionCompiler{
+
+    public StatelessExpressionCompiler(StatementContext context,
+            boolean resolveViewConstants) {
+        super(context, resolveViewConstants);
+    }
+
+    public StatelessExpressionCompiler(StatementContext context,
+            GroupBy groupBy, boolean resolveViewConstants) {
+        super(context, groupBy, resolveViewConstants);
+    }
+
+    public StatelessExpressionCompiler(StatementContext context, GroupBy groupBy) {
+        super(context, groupBy);
+    }
+
+    public StatelessExpressionCompiler(StatementContext context) {
+        super(context);
+    }
+
+    @Override
+    protected Expression addExpression(Expression expression) {
+        return expression;
+    }
+
+    @Override
+    protected void addColumn(PColumn column) {
+
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index 619ee24..fc3e263 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -289,4 +289,9 @@ public class TraceQueryPlan implements QueryPlan {
     public Long getEstimateInfoTimestamp() throws SQLException {
         return 0l;
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return Collections.<OrderBy> emptyList();
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index a54fbdd..ecdd5bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -83,7 +83,7 @@ public class UnionCompiler {
         for (int i = 0; i < plans.size(); i++) {
             QueryPlan subPlan = plans.get(i);
             TupleProjector projector = getTupleProjector(subPlan.getProjector(), targetTypes);
-            subPlan = new TupleProjectionPlan(subPlan, projector, null);
+            subPlan = new TupleProjectionPlan(subPlan, projector, null, null);
             plans.set(i, subPlan);
         }
         QueryPlan plan = plans.get(0);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index d7c3048..fb722d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -73,6 +73,7 @@ import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.CostUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,6 +92,7 @@ public class AggregatePlan extends BaseQueryPlan {
     private List<List<Scan>> scans;
     private static final Logger logger = LoggerFactory.getLogger(AggregatePlan.class);
     private boolean isSerial;
+    private OrderBy actualOutputOrderBy;
 
     public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
             RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,
@@ -113,6 +115,7 @@ public class AggregatePlan extends BaseQueryPlan {
             logger.warn("This query cannot be executed serially. Ignoring the hint");
         }
         this.isSerial = hasSerialHint && canBeExecutedSerially;
+        this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, groupBy, context);
     }
 
     public Expression getHaving() {
@@ -185,21 +188,24 @@ public class AggregatePlan extends BaseQueryPlan {
         }
         @Override
         public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws SQLException {
-            Expression expression = RowKeyExpression.INSTANCE;
-            boolean isNullsLast=false;
-            boolean isAscending=true;
-            if(this.orderBy==OrderBy.REV_ROW_KEY_ORDER_BY) {
-                isNullsLast=true; //which is needed for the whole rowKey.
-                isAscending=false;
-            }
-            OrderByExpression orderByExpression = new OrderByExpression(expression, isNullsLast, isAscending);
+            /**
+             * Sort the result tuples by the GroupBy expressions.
+             * When orderByReverse is false,if some GroupBy expression is SortOrder.DESC, then sorted results on that expression are DESC, not ASC.
+             * When orderByReverse is true,if some GroupBy expression is SortOrder.DESC, then sorted results on that expression are ASC, not DESC.
+             */
+            OrderByExpression orderByExpression =
+                    OrderByExpression.createByCheckIfOrderByReverse(
+                            RowKeyExpression.INSTANCE,
+                            false,
+                            true,
+                            this.orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
             long threshold =
                     services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
-                        QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+                            QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
             boolean spoolingEnabled =
                     services.getProps().getBoolean(
-                        QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
-                        QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+                            QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                            QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
             return new OrderedResultIterator(scanner,
                     Collections.<OrderByExpression> singletonList(orderByExpression),
                     spoolingEnabled, threshold);
@@ -343,4 +349,15 @@ public class AggregatePlan extends BaseQueryPlan {
         return visitor.visit(this);
     }
 
+    private static OrderBy convertActualOutputOrderBy(OrderBy orderBy, GroupBy groupBy, StatementContext statementContext) {
+        if(!orderBy.isEmpty()) {
+            return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy);
+        }
+        return ExpressionUtil.convertGroupByToOrderBy(groupBy, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
+    }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+       return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 2e0c374..aa99cab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -70,6 +70,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.CostUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.TupleUtil;
 
 import com.google.common.collect.Lists;
@@ -80,6 +81,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
     private final ServerAggregators serverAggregators;
     private final ClientAggregators clientAggregators;
     private final boolean useHashAgg;
+    private OrderBy actualOutputOrderBy;
     
     public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
             Integer limit, Integer offset, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) {
@@ -97,6 +99,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
         // Extract hash aggregate hint, if any.
         HintNode hints = statement.getHint();
         useHashAgg = hints != null && hints.hasHint(HintNode.Hint.HASH_AGGREGATE);
+        this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, groupBy, context);
     }
 
     @Override
@@ -156,7 +159,12 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
                             QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
                 List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
                 for (Expression keyExpression : keyExpressions) {
-                    keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
+                    /**
+                     * Sort the result tuples by the GroupBy expressions.
+                     * If some GroupBy expression is SortOrder.DESC, then sorted results on that expression are DESC, not ASC.
+                     * for ClientAggregatePlan,the orderBy should not be OrderBy.REV_ROW_KEY_ORDER_BY, which is different from {@link AggregatePlan.OrderingResultIteratorFactory#newIterator}
+                     **/
+                    keyExpressionOrderBy.add(OrderByExpression.createByCheckIfOrderByReverse(keyExpression, false, true, false));
                 }
 
                 if (useHashAgg) {
@@ -325,4 +333,25 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
                     + resultIterator + ", aggregators=" + aggregators + "]";
         }
     }
+
+    private OrderBy convertActualOutputOrderBy(OrderBy orderBy, GroupBy groupBy, StatementContext statementContext) {
+        if(!orderBy.isEmpty()) {
+            return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy);
+        }
+
+        if(this.useHashAgg &&
+           !groupBy.isEmpty() &&
+           !groupBy.isOrderPreserving() &&
+           orderBy != OrderBy.FWD_ROW_KEY_ORDER_BY &&
+           orderBy != OrderBy.REV_ROW_KEY_ORDER_BY) {
+            return OrderBy.EMPTY_ORDER_BY;
+        }
+
+        return ExpressionUtil.convertGroupByToOrderBy(groupBy, orderBy == OrderBy.REV_ROW_KEY_ORDER_BY);
+    }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+       return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 4a54a41..f39e5bf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.execute;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -42,15 +43,19 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.CostUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 
 import com.google.common.collect.Lists;
 
 public class ClientScanPlan extends ClientProcessingPlan {
 
+    private List<OrderBy> actualOutputOrderBys;
+
     public ClientScanPlan(StatementContext context, FilterableStatement statement, TableRef table,
             RowProjector projector, Integer limit, Integer offset, Expression where, OrderBy orderBy,
             QueryPlan delegate) {
         super(context, statement, table, projector, limit, offset, where, orderBy, delegate);
+        this.actualOutputOrderBys = convertActualOutputOrderBy(orderBy, delegate, context);
     }
 
     @Override
@@ -142,4 +147,21 @@ public class ClientScanPlan extends ClientProcessingPlan {
         return new ExplainPlan(planSteps);
     }
 
+    private static List<OrderBy> convertActualOutputOrderBy(
+            OrderBy orderBy,
+            QueryPlan targetQueryPlan,
+            StatementContext statementContext) {
+
+        if(!orderBy.isEmpty()) {
+            return Collections.singletonList(OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy));
+        }
+
+        assert orderBy != OrderBy.REV_ROW_KEY_ORDER_BY;
+        return targetQueryPlan.getOutputOrderBys();
+    }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+       return this.actualOutputOrderBys;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
index 0ecf74d..c6678cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CursorFetchPlan.java
@@ -71,7 +71,7 @@ public class CursorFetchPlan extends DelegateQueryPlan {
 		return fetchSize;
 	}
 
-        public boolean isAggregate(){
-            return this.isAggregate;
-        }
+	public boolean isAggregate(){
+	    return this.isAggregate;
+	}
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
index 3da06db..4724edd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
@@ -162,4 +162,9 @@ public abstract class DelegateQueryPlan implements QueryPlan {
     public Long getEstimateInfoTimestamp() throws SQLException {
         return delegate.getEstimateInfoTimestamp();
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return delegate.getOutputOrderBys();
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index 255fca3..a9f9d8e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -142,4 +142,9 @@ public class LiteralResultIterationPlan extends BaseQueryPlan {
     public Long getEstimateInfoTimestamp() throws SQLException {
         return 0l;
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return Collections.<OrderBy> emptyList();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 0ad0d1b..38d47c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.schema.SaltingUtil;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.StatisticsUtil;
 import org.apache.phoenix.util.CostUtil;
+import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
@@ -96,6 +97,7 @@ public class ScanPlan extends BaseQueryPlan {
     private Long serialRowsEstimate;
     private Long serialBytesEstimate;
     private Long serialEstimateInfoTs;
+    private OrderBy actualOutputOrderBy;
 
     public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit,
             Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, 
@@ -126,6 +128,7 @@ public class ScanPlan extends BaseQueryPlan {
             serialRowsEstimate = estimate.getSecond();
             serialEstimateInfoTs = StatisticsUtil.NOT_STATS_BASED_TS;
         }
+        this.actualOutputOrderBy = convertActualOutputOrderBy(orderBy, context);
     }
 
     private static boolean isSerial(StatementContext context, FilterableStatement statement,
@@ -350,4 +353,25 @@ public class ScanPlan extends BaseQueryPlan {
         }
         return super.getEstimateInfoTimestamp();
     }
+
+    private static OrderBy convertActualOutputOrderBy(OrderBy orderBy, StatementContext statementContext) throws SQLException {
+        if(!orderBy.isEmpty()) {
+            return OrderBy.convertCompiledOrderByToOutputOrderBy(orderBy);
+        }
+
+        if(!ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, statementContext)) {
+            return OrderBy.EMPTY_ORDER_BY;
+        }
+
+        TableRef tableRef = statementContext.getResolver().getTables().get(0);
+        return ExpressionUtil.getOrderByFromTable(
+                tableRef,
+                statementContext.getConnection(),
+                orderBy == OrderBy.REV_ROW_KEY_ORDER_BY).getFirst();
+    }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+       return OrderBy.wrapForOutputOrderBys(this.actualOutputOrderBy);
+    }
 }
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index 73597c6..8e20908 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -20,30 +20,25 @@ package org.apache.phoenix.execute;
 import static org.apache.phoenix.util.NumberUtil.add;
 import static org.apache.phoenix.util.NumberUtil.getMin;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatelessExpressionCompiler;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -52,8 +47,8 @@ import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.execute.visitor.ByteCountVisitor;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
-import org.apache.phoenix.iterate.BufferedQueue;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PhoenixQueues;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -62,19 +57,21 @@ import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.ResultUtil;
 import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.Lists;
@@ -103,10 +100,23 @@ public class SortMergeJoinPlan implements QueryPlan {
     private Long estimatedRows;
     private Long estimateInfoTs;
     private boolean getEstimatesCalled;
-
-    public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, 
-            JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
-            PTable joinedTable, PTable lhsTable, PTable rhsTable, int rhsFieldPosition, boolean isSingleValueOnly) {
+    private List<OrderBy> actualOutputOrderBys;
+
+    public SortMergeJoinPlan(
+            StatementContext context,
+            FilterableStatement statement,
+            TableRef table,
+            JoinType type,
+            QueryPlan lhsPlan,
+            QueryPlan rhsPlan,
+            Pair<List<Expression>,List<Expression>> lhsAndRhsKeyExpressions,
+            List<Expression> rhsKeyExpressions,
+            PTable joinedTable,
+            PTable lhsTable,
+            PTable rhsTable,
+            int rhsFieldPosition,
+            boolean isSingleValueOnly,
+            Pair<List<OrderByNode>,List<OrderByNode>> lhsAndRhsOrderByNodes) throws SQLException {
         if (type == JoinType.Right) throw new IllegalArgumentException("JoinType should not be " + type);
         this.context = context;
         this.statement = statement;
@@ -114,8 +124,8 @@ public class SortMergeJoinPlan implements QueryPlan {
         this.type = type;
         this.lhsPlan = lhsPlan;
         this.rhsPlan = rhsPlan;
-        this.lhsKeyExpressions = lhsKeyExpressions;
-        this.rhsKeyExpressions = rhsKeyExpressions;
+        this.lhsKeyExpressions = lhsAndRhsKeyExpressions.getFirst();
+        this.rhsKeyExpressions = lhsAndRhsKeyExpressions.getSecond();
         this.joinedSchema = buildSchema(joinedTable);
         this.lhsSchema = buildSchema(lhsTable);
         this.rhsSchema = buildSchema(rhsTable);
@@ -132,6 +142,7 @@ public class SortMergeJoinPlan implements QueryPlan {
                 context.getConnection().getQueryServices().getProps().getBoolean(
                     QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB,
                     QueryServicesOptions.DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED);
+        this.actualOutputOrderBys = convertActualOutputOrderBy(lhsAndRhsOrderByNodes.getFirst(), lhsAndRhsOrderByNodes.getSecond(), context);
     }
 
     @Override
@@ -704,4 +715,72 @@ public class SortMergeJoinPlan implements QueryPlan {
                     getMin(lhsPlan.getEstimateInfoTimestamp(), rhsPlan.getEstimateInfoTimestamp());
         }
     }
+
+    /**
+     * We do not use {@link #lhsKeyExpressions} and {@link #rhsKeyExpressions} directly because {@link #lhsKeyExpressions} is compiled by the
+     * {@link ColumnResolver} of lhs and {@link #rhsKeyExpressions} is compiled by the {@link ColumnResolver} of rhs, so we must recompile use
+     * the {@link ColumnResolver} of joinProjectedTables.
+     * @param lhsOrderByNodes
+     * @param rhsOrderByNodes
+     * @param statementContext
+     * @return
+     * @throws SQLException
+     */
+    private static List<OrderBy> convertActualOutputOrderBy(
+            List<OrderByNode> lhsOrderByNodes,
+            List<OrderByNode> rhsOrderByNodes,
+            StatementContext statementContext) throws SQLException {
+
+        List<OrderBy> orderBys = new ArrayList<OrderBy>(2);
+        List<OrderByExpression> lhsOrderByExpressions =
+                compileOrderByNodes(lhsOrderByNodes, statementContext);
+        if(!lhsOrderByExpressions.isEmpty()) {
+            orderBys.add(new OrderBy(lhsOrderByExpressions));
+        }
+
+        List<OrderByExpression> rhsOrderByExpressions =
+                compileOrderByNodes(rhsOrderByNodes, statementContext);
+        if(!rhsOrderByExpressions.isEmpty()) {
+            orderBys.add(new OrderBy(rhsOrderByExpressions));
+        }
+        if(orderBys.isEmpty()) {
+            return Collections.<OrderBy> emptyList();
+        }
+        return orderBys;
+    }
+
+    private static List<OrderByExpression> compileOrderByNodes(List<OrderByNode> orderByNodes, StatementContext statementContext) throws SQLException {
+        /**
+         * If there is TableNotFoundException or ColumnNotFoundException, it means that the orderByNodes is not referenced by other parts of the sql,
+         * so could be ignored.
+         */
+        StatelessExpressionCompiler expressionCompiler = new StatelessExpressionCompiler(statementContext);
+        List<OrderByExpression> orderByExpressions = new ArrayList<OrderByExpression>(orderByNodes.size());
+        for(OrderByNode orderByNode : orderByNodes) {
+            expressionCompiler.reset();
+            Expression expression = null;
+            try {
+                expression = orderByNode.getNode().accept(expressionCompiler);
+            } catch(TableNotFoundException exception) {
+                return orderByExpressions;
+            } catch(ColumnNotFoundException exception) {
+                return orderByExpressions;
+            } catch(ColumnFamilyNotFoundException exception) {
+                return orderByExpressions;
+            }
+            assert expression != null;
+            orderByExpressions.add(
+                    OrderByExpression.createByCheckIfOrderByReverse(
+                            expression,
+                            orderByNode.isNullsLast(),
+                            orderByNode.isAscending(),
+                            false));
+        }
+        return orderByExpressions;
+    }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return this.actualOutputOrderBys;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index f869a4c..4b56c23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -18,17 +18,32 @@
 package org.apache.phoenix.execute;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderPreservingTracker;
+import org.apache.phoenix.compile.OrderPreservingTracker.Info;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.OrderPreservingTracker.Ordering;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.FilterResultIterator;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 import com.google.common.collect.Lists;
@@ -36,12 +51,102 @@ import com.google.common.collect.Lists;
 public class TupleProjectionPlan extends DelegateQueryPlan {
     private final TupleProjector tupleProjector;
     private final Expression postFilter;
+    private final StatementContext statementContext;
+    private ColumnResolver columnResolver = null;
+    private List<OrderBy> actualOutputOrderBys = Collections.<OrderBy> emptyList();
 
-    public TupleProjectionPlan(QueryPlan plan, TupleProjector tupleProjector, Expression postFilter) {
+    public TupleProjectionPlan(
+            QueryPlan plan,
+            TupleProjector tupleProjector,
+            StatementContext statementContext,
+            Expression postFilter) throws SQLException {
         super(plan);
         if (tupleProjector == null) throw new IllegalArgumentException("tupleProjector is null");
         this.tupleProjector = tupleProjector;
+        this.statementContext = statementContext;
         this.postFilter = postFilter;
+        if(this.statementContext != null) {
+            this.columnResolver = statementContext.getResolver();
+            this.actualOutputOrderBys = this.convertInputOrderBys(plan);
+        }
+    }
+
+    /**
+     * Map the expressions in the actualOutputOrderBys of targetQueryPlan to {@link ProjectedColumnExpression}.
+     * @param targetQueryPlan
+     * @return
+     * @throws SQLException
+     */
+    private List<OrderBy> convertInputOrderBys(QueryPlan targetQueryPlan) throws SQLException {
+        List<OrderBy> inputOrderBys = targetQueryPlan.getOutputOrderBys();
+        if(inputOrderBys.isEmpty()) {
+            return Collections.<OrderBy> emptyList();
+        }
+        Expression[] selectColumnExpressions = this.tupleProjector.getExpressions();
+        Map<Expression,Integer> selectColumnExpressionToIndex =
+                new HashMap<Expression, Integer>(selectColumnExpressions.length);
+        int columnIndex = 0;
+        for(Expression selectColumnExpression : selectColumnExpressions) {
+            selectColumnExpressionToIndex.put(selectColumnExpression, columnIndex++);
+        }
+        List<OrderBy> newOrderBys = new ArrayList<OrderBy>(inputOrderBys.size());
+        for(OrderBy inputOrderBy : inputOrderBys) {
+            OrderBy newOrderBy = this.convertSingleInputOrderBy(
+                    selectColumnExpressionToIndex,
+                    selectColumnExpressions,
+                    inputOrderBy);
+            if(newOrderBy != OrderBy.EMPTY_ORDER_BY) {
+                newOrderBys.add(newOrderBy);
+            }
+        }
+        if(newOrderBys.isEmpty()) {
+            return Collections.<OrderBy> emptyList();
+        }
+        return newOrderBys;
+    }
+
+    private OrderBy convertSingleInputOrderBy(
+            Map<Expression,Integer> selectColumnExpressionToIndex,
+            Expression[] selectColumnExpressions,
+            OrderBy inputOrderBy) throws SQLException {
+
+        OrderPreservingTracker orderPreservingTracker = new OrderPreservingTracker(
+                this.statementContext,
+                GroupBy.EMPTY_GROUP_BY,
+                Ordering.UNORDERED,
+                selectColumnExpressions.length,
+                Collections.singletonList(inputOrderBy),
+                null,
+                null);
+        for(Expression selectColumnExpression : selectColumnExpressions) {
+            orderPreservingTracker.track(selectColumnExpression);
+        }
+        orderPreservingTracker.isOrderPreserving();
+        List<Info> orderPreservingTrackInfos = orderPreservingTracker.getOrderPreservingTrackInfos();
+        if(orderPreservingTrackInfos.isEmpty()) {
+            return OrderBy.EMPTY_ORDER_BY;
+        }
+        List<OrderByExpression> newOrderByExpressions = new ArrayList<OrderByExpression>(orderPreservingTrackInfos.size());
+        for(Info orderPreservingTrackInfo : orderPreservingTrackInfos) {
+            Expression expression = orderPreservingTrackInfo.getExpression();
+            Integer index = selectColumnExpressionToIndex.get(expression);
+            assert index != null;
+            ProjectedColumnExpression projectedValueColumnExpression = this.getProjectedValueColumnExpression(index);
+            OrderByExpression newOrderByExpression = OrderByExpression.createByCheckIfOrderByReverse(
+                    projectedValueColumnExpression,
+                    orderPreservingTrackInfo.isNullsLast(),
+                    orderPreservingTrackInfo.isAscending(),
+                    false);
+            newOrderByExpressions.add(newOrderByExpression);
+        }
+        return new OrderBy(newOrderByExpressions);
+    }
+
+    private ProjectedColumnExpression getProjectedValueColumnExpression(int columnIndex) throws SQLException {
+        assert this.columnResolver != null;
+        TableRef tableRef = this.columnResolver.getTables().get(0);
+        ColumnRef columnRef = new ColumnRef(tableRef, columnIndex);
+        return (ProjectedColumnExpression)columnRef.newColumnExpression();
     }
 
     @Override
@@ -84,4 +189,9 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
     public <T> T accept(QueryPlanVisitor<T> visitor) {
         return visitor.visit(this);
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return this.actualOutputOrderBys;
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 6114d66..07a3aa5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.util.NumberUtil.getMin;
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -48,6 +49,7 @@ import org.apache.phoenix.optimize.Cost;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ExpressionUtil;
 
 import com.google.common.collect.Sets;
 
@@ -299,4 +301,15 @@ public class UnionPlan implements QueryPlan {
             }
         }
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        assert this.groupBy == GroupBy.EMPTY_GROUP_BY;
+        assert this.orderBy != OrderBy.FWD_ROW_KEY_ORDER_BY && this.orderBy != OrderBy.REV_ROW_KEY_ORDER_BY;
+        if(!this.orderBy.isEmpty()) {
+            return Collections.<OrderBy> singletonList(
+                    OrderBy.convertCompiledOrderByToOutputOrderBy(this.orderBy));
+        }
+        return Collections.<OrderBy> emptyList();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 0bc3df4..896d1ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -18,12 +18,14 @@
 package org.apache.phoenix.execute;
 
 import java.sql.SQLException;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
 import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
@@ -188,4 +190,9 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
             return PInteger.INSTANCE;
         }
     }
+
+    @Override
+    public List<OrderBy> getOutputOrderBys() {
+        return Collections.<OrderBy> emptyList();
+    }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
index 50a7847..2978bee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/OrderByExpression.java
@@ -26,7 +26,10 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.OrderByCompiler;
+import org.apache.phoenix.compile.OrderPreservingTracker.Info;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.execute.AggregatePlan;
 
 /**
  * A container for a column that appears in ORDER BY clause.
@@ -38,14 +41,92 @@ public class OrderByExpression implements Writable {
     
     public OrderByExpression() {
     }
-    
-    public OrderByExpression(Expression expression, boolean isNullsLast, boolean isAcending) {
+
+    private OrderByExpression(Expression expression, boolean isNullsLast, boolean isAcending) {
         checkNotNull(expression);
         this.expression = expression;
         this.isNullsLast = isNullsLast;
         this.isAscending = isAcending;
     }
 
+    /**
+     * If {@link Expression#getSortOrder()} is {@link SortOrder#DESC},the isAscending of returned new is reversed,but isNullsLast is untouched.
+     * @param expression
+     * @param isNullsLast
+     * @param isAscending
+     * @return
+     */
+    public static OrderByExpression convertIfExpressionSortOrderDesc(OrderByExpression orderByExpression) {
+        return createByCheckIfExpressionSortOrderDesc(
+                orderByExpression.getExpression(),
+                orderByExpression.isNullsLast(),
+                orderByExpression.isAscending());
+    }
+
+    /**
+     * If {@link Expression#getSortOrder()} is {@link SortOrder#DESC},reverse the isAscending,but isNullsLast is untouched.
+     * A typical case is in {@link OrderByCompiler#compile} to get the compiled {@link OrderByExpression} to used for {@link OrderedResultIterator}.
+     * @param expression
+     * @param isNullsLast
+     * @param isAscending
+     * @return
+     */
+    public static OrderByExpression createByCheckIfExpressionSortOrderDesc(Expression expression, boolean isNullsLast, boolean isAscending) {
+        if(expression.getSortOrder() == SortOrder.DESC) {
+            isAscending = !isAscending;
+        }
+        return new OrderByExpression(expression, isNullsLast, isAscending);
+    }
+
+    /**
+     * If orderByReverse is true, reverse the isNullsLast and isAscending.
+     * A typical case is in {@link AggregatePlan.OrderingResultIteratorFactory#newIterator}
+     * @param expression
+     * @param isNullsLast
+     * @param isAscending
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderByExpression createByCheckIfOrderByReverse(Expression expression, boolean isNullsLast, boolean isAscending, boolean orderByReverse) {
+        if(orderByReverse) {
+            isNullsLast = !isNullsLast;
+            isAscending = !isAscending;
+        }
+        return new OrderByExpression(expression, isNullsLast, isAscending);
+    }
+
+    /**
+     * Create OrderByExpression from expression,isNullsLast is the default value "false",isAscending is based on {@link Expression#getSortOrder()}.
+     * If orderByReverse is true, reverses the isNullsLast and isAscending.
+     * @param expression
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderByExpression convertExpressionToOrderByExpression(Expression expression, boolean orderByReverse) {
+      return convertExpressionToOrderByExpression(expression, null, orderByReverse);
+    }
+
+    /**
+     * Create OrderByExpression from expression, if the orderPreservingTrackInfo is not null, use isNullsLast and isAscending from orderPreservingTrackInfo.
+     * If orderByReverse is true, reverses the isNullsLast and isAscending.
+     * @param expression
+     * @param orderPreservingTrackInfo
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderByExpression convertExpressionToOrderByExpression(
+            Expression expression,
+            Info orderPreservingTrackInfo,
+            boolean orderByReverse) {
+        boolean isNullsLast = false;
+        boolean isAscending = expression.getSortOrder() == SortOrder.ASC;
+        if(orderPreservingTrackInfo != null) {
+            isNullsLast = orderPreservingTrackInfo.isNullsLast();
+            isAscending = orderPreservingTrackInfo.isAscending();
+        }
+        return OrderByExpression.createByCheckIfOrderByReverse(expression, isNullsLast, isAscending, orderByReverse);
+    }
+
     public Expression getExpression() {
         return expression;
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 6fe1d62..a42c5f8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -766,6 +766,11 @@ public class PhoenixStatement implements Statement, SQLCloseable {
                 public Long getEstimateInfoTimestamp() throws SQLException {
                     return estimateTs;
                 }
+
+                @Override
+                public List<OrderBy> getOutputOrderBys() {
+                    return Collections.<OrderBy> emptyList();
+                }
             };
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
index e737721..c6854ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ExpressionUtil.java
@@ -10,11 +10,21 @@
 package org.apache.phoenix.util;
 
 import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.TreeMap;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.ExpressionCompiler;
+import org.apache.phoenix.compile.OrderPreservingTracker.Info;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
@@ -22,10 +32,17 @@ import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.IsNullExpression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor;
 import org.apache.phoenix.expression.visitor.StatelessTraverseNoExpressionVisitor;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.ProjectedColumn;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PBoolean;
 import org.apache.phoenix.schema.types.PDataType;
@@ -224,4 +241,281 @@ public class ExpressionUtil {
             return Boolean.TRUE;
         }
     }
+
+    /**
+     * <pre>
+     * Infer OrderBys from the rowkey columns of {@link PTable},for projected table may be there is no rowkey columns,
+     * so we should move forward to inspect {@link ProjectedColumn} by {@link #getOrderByFromProjectedTable}.
+     * The second part of the return pair is the rowkey column offset we must skip when we create OrderBys, because for table with salted/multiTenant/viewIndexId,
+     * some leading rowkey columns should be skipped.
+     * </pre>
+     * @param tableRef
+     * @param phoenixConnection
+     * @param orderByReverse
+     * @return
+     * @throws SQLException
+     */
+    public static Pair<OrderBy,Integer> getOrderByFromTable(
+            TableRef tableRef,
+            PhoenixConnection phoenixConnection,
+            boolean orderByReverse) throws SQLException {
+
+        PTable table = tableRef.getTable();
+        Pair<OrderBy,Integer> orderByAndRowKeyColumnOffset =
+                getOrderByFromTableByRowKeyColumn(table, phoenixConnection, orderByReverse);
+        if(orderByAndRowKeyColumnOffset.getFirst() != OrderBy.EMPTY_ORDER_BY) {
+            return orderByAndRowKeyColumnOffset;
+        }
+        if(table.getType() == PTableType.PROJECTED) {
+            orderByAndRowKeyColumnOffset =
+                    getOrderByFromProjectedTable(tableRef, phoenixConnection, orderByReverse);
+            if(orderByAndRowKeyColumnOffset.getFirst() != OrderBy.EMPTY_ORDER_BY) {
+                return orderByAndRowKeyColumnOffset;
+            }
+        }
+        return new Pair<OrderBy,Integer>(OrderBy.EMPTY_ORDER_BY, 0);
+    }
+
+    /**
+     * Infer OrderBys from the rowkey columns of {@link PTable}.
+     * The second part of the return pair is the rowkey column offset we must skip when we create OrderBys, because for table with salted/multiTenant/viewIndexId,
+     * some leading rowkey columns should be skipped.
+     * @param table
+     * @param phoenixConnection
+     * @param orderByReverse
+     * @return
+     */
+    public static Pair<OrderBy,Integer> getOrderByFromTableByRowKeyColumn(
+            PTable table,
+            PhoenixConnection phoenixConnection,
+            boolean orderByReverse) {
+        Pair<List<RowKeyColumnExpression>,Integer> rowKeyColumnExpressionsAndRowKeyColumnOffset =
+                ExpressionUtil.getRowKeyColumnExpressionsFromTable(table, phoenixConnection);
+        List<RowKeyColumnExpression> rowKeyColumnExpressions = rowKeyColumnExpressionsAndRowKeyColumnOffset.getFirst();
+        int rowKeyColumnOffset = rowKeyColumnExpressionsAndRowKeyColumnOffset.getSecond();
+        if(rowKeyColumnExpressions.isEmpty()) {
+            return new Pair<OrderBy,Integer>(OrderBy.EMPTY_ORDER_BY,0);
+        }
+        return new Pair<OrderBy,Integer>(
+                convertRowKeyColumnExpressionsToOrderBy(rowKeyColumnExpressions, orderByReverse),
+                rowKeyColumnOffset);
+    }
+
+    /**
+     * For projected table may be there is no rowkey columns,
+     * so we should move forward to inspect {@link ProjectedColumn} to check if the source column is rowkey column.
+     * The second part of the return pair is the rowkey column offset we must skip when we create OrderBys, because for table with salted/multiTenant/viewIndexId,
+     * some leading rowkey columns should be skipped.
+     * @param projectedTableRef
+     * @param phoenixConnection
+     * @param orderByReverse
+     * @return
+     * @throws SQLException
+     */
+    public static Pair<OrderBy,Integer> getOrderByFromProjectedTable(
+            TableRef projectedTableRef,
+            PhoenixConnection phoenixConnection,
+            boolean orderByReverse) throws SQLException {
+
+        PTable projectedTable = projectedTableRef.getTable();
+        assert projectedTable.getType() == PTableType.PROJECTED;
+        TableRef sourceTableRef = null;
+        TreeMap<Integer,ColumnRef> sourceRowKeyColumnIndexToProjectedColumnRef =
+                new TreeMap<Integer, ColumnRef>();
+
+        for(PColumn column : projectedTable.getColumns()) {
+            if(!(column instanceof ProjectedColumn)) {
+                continue;
+            }
+            ProjectedColumn projectedColumn = (ProjectedColumn)column;
+            ColumnRef sourceColumnRef = projectedColumn.getSourceColumnRef();
+            TableRef currentSourceTableRef = sourceColumnRef.getTableRef();
+            if(sourceTableRef == null) {
+                sourceTableRef = currentSourceTableRef;
+            }
+            else if(!sourceTableRef.equals(currentSourceTableRef)) {
+                return new Pair<OrderBy,Integer>(OrderBy.EMPTY_ORDER_BY, 0);
+            }
+            int sourceRowKeyColumnIndex = sourceColumnRef.getPKSlotPosition();
+            if(sourceRowKeyColumnIndex >= 0) {
+                ColumnRef projectedColumnRef =
+                        new ColumnRef(projectedTableRef, projectedColumn.getPosition());
+                sourceRowKeyColumnIndexToProjectedColumnRef.put(
+                        Integer.valueOf(sourceRowKeyColumnIndex), projectedColumnRef);
+            }
+        }
+
+        if(sourceTableRef == null) {
+            return new Pair<OrderBy,Integer>(OrderBy.EMPTY_ORDER_BY, 0);
+        }
+
+        final int sourceRowKeyColumnOffset = getRowKeyColumnOffset(sourceTableRef.getTable(), phoenixConnection);
+        List<OrderByExpression> orderByExpressions = new LinkedList<OrderByExpression>();
+        int matchedSourceRowKeyColumnOffset = sourceRowKeyColumnOffset;
+        for(Entry<Integer,ColumnRef> entry : sourceRowKeyColumnIndexToProjectedColumnRef.entrySet()) {
+            int currentRowKeyColumnOffset = entry.getKey();
+            if(currentRowKeyColumnOffset < matchedSourceRowKeyColumnOffset) {
+                continue;
+            }
+            else if(currentRowKeyColumnOffset == matchedSourceRowKeyColumnOffset) {
+                matchedSourceRowKeyColumnOffset++;
+            }
+            else {
+                break;
+            }
+
+            ColumnRef projectedColumnRef = entry.getValue();
+            Expression projectedValueColumnExpression = projectedColumnRef.newColumnExpression();
+            OrderByExpression orderByExpression =
+                    OrderByExpression.convertExpressionToOrderByExpression(projectedValueColumnExpression, orderByReverse);
+            orderByExpressions.add(orderByExpression);
+        }
+
+        if(orderByExpressions.isEmpty()) {
+            return new Pair<OrderBy,Integer>(OrderBy.EMPTY_ORDER_BY, 0);
+        }
+        return new Pair<OrderBy,Integer>(new OrderBy(orderByExpressions), sourceRowKeyColumnOffset);
+    }
+
+    /**
+     * For table with salted/multiTenant/viewIndexId,some leading rowkey columns should be skipped.
+     * @param table
+     * @param phoenixConnection
+     * @return
+     */
+    public static int getRowKeyColumnOffset(PTable table, PhoenixConnection phoenixConnection) {
+        boolean isSalted = table.getBucketNum() != null;
+        boolean isMultiTenant = phoenixConnection.getTenantId() != null && table.isMultiTenant();
+        boolean isSharedViewIndex = table.getViewIndexId() != null;
+        return (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
+    }
+
+    /**
+     * Create {@link RowKeyColumnExpression} from {@link PTable}.
+     * The second part of the return pair is the rowkey column offset we must skip when we create OrderBys, because for table with salted/multiTenant/viewIndexId,
+     * some leading rowkey columns should be skipped.
+     * @param table
+     * @param phoenixConnection
+     * @return
+     */
+    public static Pair<List<RowKeyColumnExpression>,Integer> getRowKeyColumnExpressionsFromTable(PTable table, PhoenixConnection phoenixConnection) {
+        int pkPositionOffset = getRowKeyColumnOffset(table, phoenixConnection);
+        List<PColumn> pkColumns = table.getPKColumns();
+        if(pkPositionOffset >= pkColumns.size()) {
+            return new Pair<List<RowKeyColumnExpression>,Integer>(Collections.<RowKeyColumnExpression> emptyList(), 0);
+        }
+        List<RowKeyColumnExpression> rowKeyColumnExpressions = new ArrayList<RowKeyColumnExpression>(pkColumns.size() - pkPositionOffset);
+        for(int index = pkPositionOffset; index < pkColumns.size(); index++) {
+            RowKeyColumnExpression rowKeyColumnExpression =
+                    new RowKeyColumnExpression(pkColumns.get(index), new RowKeyValueAccessor(pkColumns, index));
+            rowKeyColumnExpressions.add(rowKeyColumnExpression);
+        }
+        return new Pair<List<RowKeyColumnExpression>,Integer>(rowKeyColumnExpressions, pkPositionOffset);
+    }
+
+    /**
+     * Create OrderByExpression by RowKeyColumnExpression,isNullsLast is the default value "false",isAscending is based on {@link Expression#getSortOrder()}.
+     * If orderByReverse is true, reverse the isNullsLast and isAscending.
+     * @param rowKeyColumnExpressions
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderBy convertRowKeyColumnExpressionsToOrderBy(List<RowKeyColumnExpression> rowKeyColumnExpressions, boolean orderByReverse) {
+        return convertRowKeyColumnExpressionsToOrderBy(
+                rowKeyColumnExpressions, Collections.<Info> emptyList(), orderByReverse);
+    }
+
+    /**
+     * Create OrderByExpression by RowKeyColumnExpression, if the orderPreservingTrackInfos is not null, use isNullsLast and isAscending from orderPreservingTrackInfos.
+     * If orderByReverse is true, reverse the isNullsLast and isAscending.
+     * @param rowKeyColumnExpressions
+     * @param orderPreservingTrackInfos
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderBy convertRowKeyColumnExpressionsToOrderBy(
+            List<RowKeyColumnExpression> rowKeyColumnExpressions,
+            List<Info> orderPreservingTrackInfos,
+            boolean orderByReverse) {
+        if(rowKeyColumnExpressions.isEmpty()) {
+            return OrderBy.EMPTY_ORDER_BY;
+        }
+        List<OrderByExpression> orderByExpressions = new ArrayList<OrderByExpression>(rowKeyColumnExpressions.size());
+        Iterator<Info> orderPreservingTrackInfosIter = null;
+        if(orderPreservingTrackInfos != null && orderPreservingTrackInfos.size() > 0) {
+            if(orderPreservingTrackInfos.size() != rowKeyColumnExpressions.size()) {
+                throw new IllegalStateException(
+                        "orderPreservingTrackInfos.size():[" + orderPreservingTrackInfos.size() +
+                        "] should equals rowKeyColumnExpressions.size():[" + rowKeyColumnExpressions.size()+"]!");
+            }
+            orderPreservingTrackInfosIter = orderPreservingTrackInfos.iterator();
+        }
+        for(RowKeyColumnExpression rowKeyColumnExpression : rowKeyColumnExpressions) {
+            Info orderPreservingTrackInfo = null;
+            if(orderPreservingTrackInfosIter != null) {
+                assert orderPreservingTrackInfosIter.hasNext();
+                orderPreservingTrackInfo = orderPreservingTrackInfosIter.next();
+            }
+            OrderByExpression orderByExpression =
+                    OrderByExpression.convertExpressionToOrderByExpression(rowKeyColumnExpression, orderPreservingTrackInfo, orderByReverse);
+            orderByExpressions.add(orderByExpression);
+        }
+        return new OrderBy(orderByExpressions);
+    }
+
+    /**
+     * Convert the GroupBy to OrderBy, expressions in GroupBy should be converted to {@link RowKeyColumnExpression}.
+     * @param groupBy
+     * @param orderByReverse
+     * @return
+     */
+    public static OrderBy convertGroupByToOrderBy(GroupBy groupBy, boolean orderByReverse) {
+        if(groupBy.isEmpty()) {
+            return OrderBy.EMPTY_ORDER_BY;
+        }
+        List<RowKeyColumnExpression> rowKeyColumnExpressions = convertGroupByToRowKeyColumnExpressions(groupBy);
+        List<Info> orderPreservingTrackInfos = Collections.<Info> emptyList();
+        if(groupBy.isOrderPreserving()) {
+            orderPreservingTrackInfos = groupBy.getOrderPreservingTrackInfos();
+        }
+        return convertRowKeyColumnExpressionsToOrderBy(rowKeyColumnExpressions, orderPreservingTrackInfos, orderByReverse);
+    }
+
+    /**
+     * Convert the expressions in GroupBy to {@link RowKeyColumnExpression}, the convert logic is same as {@link ExpressionCompiler#wrapGroupByExpression}.
+     * @param groupBy
+     * @return
+     */
+    public static List<RowKeyColumnExpression> convertGroupByToRowKeyColumnExpressions(GroupBy groupBy) {
+        if(groupBy.isEmpty()) {
+            return Collections.<RowKeyColumnExpression> emptyList();
+        }
+        List<Expression> groupByExpressions = groupBy.getExpressions();
+        List<RowKeyColumnExpression> rowKeyColumnExpressions = new ArrayList<RowKeyColumnExpression>(groupByExpressions.size());
+        int columnIndex = 0;
+        for(Expression groupByExpression : groupByExpressions) {
+            RowKeyColumnExpression rowKeyColumnExpression =
+                    convertGroupByExpressionToRowKeyColumnExpression(groupBy, groupByExpression, columnIndex++);
+            rowKeyColumnExpressions.add(rowKeyColumnExpression);
+        }
+        return rowKeyColumnExpressions;
+    }
+
+    /**
+     * Convert the expressions in GroupBy to {@link RowKeyColumnExpression}, a typical case is in {@link ExpressionCompiler#wrapGroupByExpression}.
+     * @param groupBy
+     * @param originalExpression
+     * @param groupByColumnIndex
+     * @return
+     */
+    public static RowKeyColumnExpression convertGroupByExpressionToRowKeyColumnExpression(
+            GroupBy groupBy,
+            Expression originalExpression,
+            int groupByColumnIndex) {
+        RowKeyValueAccessor rowKeyValueAccessor = new RowKeyValueAccessor(groupBy.getKeyExpressions(), groupByColumnIndex);
+        return new RowKeyColumnExpression(
+                originalExpression,
+                rowKeyValueAccessor,
+                groupBy.getKeyExpressions().get(groupByColumnIndex).getDataType());
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 0289e02..4d2e311 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -5502,4 +5502,455 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
             }
         }
     }
+
+    @Test
+    public void testOrderPreservingForClientScanPlanBug5148() throws Exception {
+        doTestOrderPreservingForClientScanPlanBug5148(false,false);
+        doTestOrderPreservingForClientScanPlanBug5148(false,true);
+        doTestOrderPreservingForClientScanPlanBug5148(true, false);
+        doTestOrderPreservingForClientScanPlanBug5148(true, true);
+    }
+
+    private void doTestOrderPreservingForClientScanPlanBug5148(boolean desc, boolean salted) throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            String tableName = generateUniqueName();
+            String sql = "create table " + tableName + "( "+
+                    " pk1 char(20) not null , " +
+                    " pk2 char(20) not null, " +
+                    " pk3 char(20) not null," +
+                    " v1 varchar, " +
+                    " v2 varchar, " +
+                    " CONSTRAINT TEST_PK PRIMARY KEY ( "+
+                    "pk1 "+(desc ? "desc" : "")+", "+
+                    "pk2 "+(desc ? "desc" : "")+", "+
+                    "pk3 "+(desc ? "desc" : "")+
+                    " )) "+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+
+            sql = "select v1 from (select v1,v2,pk3 from "+tableName+" t where pk1 = '6' order by t.v2,t.pk3,t.v1 limit 10) a order by v2,pk3";
+            QueryPlan plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select v1 from (select v1,v2,pk3 from "+tableName+" t where pk1 = '6' order by t.v2,t.pk3,t.v1 limit 10) a where pk3 = '8' order by v2,v1";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select v1 from (select v1,v2,pk3 from "+tableName+" t where pk1 = '6' order by t.v2 desc,t.pk3 desc,t.v1 desc limit 10) a order by v2 desc ,pk3 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3),t.v2 limit 10) a order by cnt,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3),t.v2 limit 10) a order by cast(cnt as bigint),substr(sub,0,1)";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3) desc,t.v2 desc limit 10) a order by cnt desc ,sub desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,pk2 from "+tableName+" t where pk1 = '6' group by pk2,v2 limit 10) a order by pk2,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            if(desc) {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            } else {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            }
+
+            sql = "select sub from (select substr(v2,0,2) sub,pk2 from "+tableName+" t where pk1 = '6' group by pk2,v2 limit 10) a order by pk2 desc,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            if(desc) {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            } else {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            }
+
+            sql = "select sub from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by t.v2 ,count(pk3) limit 10) a order by sub ,cnt";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select sub from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by t.v2 ,count(pk3) limit 10) a order by substr(sub,0,1) ,cast(cnt as bigint)";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select sub from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by t.v2 ,count(pk3) limit 10) a order by sub ,cnt";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select v1 from (select v1,v2,pk3  from "+tableName+" t where pk1 = '6' order by t.v2 desc,t.pk3 desc,t.v1 desc limit 10) a order by v2 ,pk3";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select v1 from (select v1,v2,pk3 from "+tableName+" t where pk1 = '6' order by t.v2,t.pk3,t.v1 limit 10) a where pk3 = '8' or (v2 < 'abc' and pk3 > '11') order by v2,v1";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            //test innerQueryPlan is ordered by rowKey
+            sql = "select pk1 from (select pk3,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1,t.pk2 limit 10) a where pk3 > '8' order by pk1,pk2,pk3";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1,t.pk2 limit 10) a where sub > '8' order by pk1,pk2,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select pk3,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1 desc,t.pk2 desc limit 10) a where pk3 > '8' order by pk1 desc ,pk2 desc ,pk3 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1 desc,t.pk2 desc limit 10) a where sub > '8' order by pk1 desc,pk2 desc,sub desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+        } finally {
+            if(conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+    @Test
+    public void testGroupByOrderPreservingForClientAggregatePlanBug5148() throws Exception {
+        doTestGroupByOrderPreservingForClientAggregatePlanBug5148(false, false);
+        doTestGroupByOrderPreservingForClientAggregatePlanBug5148(false, true);
+        doTestGroupByOrderPreservingForClientAggregatePlanBug5148(true, false);
+        doTestGroupByOrderPreservingForClientAggregatePlanBug5148(true, true);
+    }
+
+    private void doTestGroupByOrderPreservingForClientAggregatePlanBug5148(boolean desc, boolean salted) throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+            String tableName = generateUniqueName();
+            String sql = "create table " + tableName + "( "+
+                    " pk1 varchar not null , " +
+                    " pk2 varchar not null, " +
+                    " pk3 varchar not null," +
+                    " v1 varchar, " +
+                    " v2 varchar, " +
+                    " CONSTRAINT TEST_PK PRIMARY KEY ( "+
+                    "pk1 "+(desc ? "desc" : "")+", "+
+                    "pk2 "+(desc ? "desc" : "")+", "+
+                    "pk3 "+(desc ? "desc" : "")+
+                    " )) "+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+
+            sql = "select v1 from (select v1,pk2,pk1 from "+tableName+" t where pk1 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a group by pk2,v1 order by pk2,v1";
+            QueryPlan plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select v1 from (select v1,pk2,pk1 from "+tableName+" t where pk1 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' group by v1, pk1 order by v1,pk1";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select v1 from (select v1,pk2,pk1 from "+tableName+" t where pk1 = '6' order by t.pk2 desc,t.v1 desc,t.pk1 limit 10) a group by pk2, v1 order by pk2 desc,v1 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select v1 from (select v1,pk2,pk1 from "+tableName+" t where pk1 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' or (v1 < 'abc' and pk2 > '11') group by v1, pk1 order by v1,pk1";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(!plan.getGroupBy().isOrderPreserving());
+            if(desc) {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            } else {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            }
+
+            sql = "select v1 from (select v1,pk2,pk1 from "+tableName+" t where pk1 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' or (v1 < 'abc' and pk2 > '11') group by v1, pk1 order by v1,pk1 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(!plan.getGroupBy().isOrderPreserving());
+            if(desc) {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            } else {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            }
+
+            sql = "select sub from (select v1,pk2,substr(pk1,0,1) sub from "+tableName+" t where v2 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' group by v1,sub order by v1,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v1,0,1) sub,pk2,pk1 from "+tableName+" t where v2 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' group by sub,pk1 order by sub,pk1";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(!plan.getGroupBy().isOrderPreserving());
+            if(desc) {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            } else {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            }
+
+            sql = "select sub from (select substr(v1,0,1) sub,pk2,pk1 from "+tableName+" t where v2 = '6' order by t.pk2,t.v1,t.pk1 limit 10) a where pk2 = '8' group by sub,pk1 order by sub,pk1 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(!plan.getGroupBy().isOrderPreserving());
+            if(desc) {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            } else {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            }
+
+            sql = "select sub from (select substr(v2,0,2) sub,cast (count(pk3) as bigint) cnt from "+tableName+" t where pk1 = '6' group by v1,v2 order by count(pk3),t.v2 limit 10) a group by cnt,sub order by cnt,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select substr(sub,0,1) from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3),t.v2 limit 10) a "+
+                  "group by cast(cnt as bigint),substr(sub,0,1) order by cast(cnt as bigint),substr(sub,0,1)";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3) desc,t.v2 desc limit 10) a group by cnt,sub order by cnt desc ,sub desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select substr(sub,0,1) from (select substr(v2,0,2) sub,count(pk3) cnt from "+tableName+" t where pk1 = '6' group by v1 ,v2 order by count(pk3) desc,t.v2 desc limit 10) a "+
+                  "group by cast(cnt as bigint),substr(sub,0,1) order by cast(cnt as bigint) desc,substr(sub,0,1) desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select sub from (select substr(v2,0,2) sub,pk2 from "+tableName+" t where pk1 = '6' group by pk2,v2 limit 10) a group by pk2,sub order by pk2,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            if(desc) {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            } else {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            }
+
+            sql = "select sub from (select substr(v2,0,2) sub,pk2 from "+tableName+" t where pk1 = '6' group by pk2,v2 limit 10) a group by pk2,sub order by pk2 desc,sub";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            if(desc) {
+                assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+            } else {
+                assertTrue(plan.getOrderBy().getOrderByExpressions().size() > 0);
+            }
+
+            //test innerQueryPlan is ordered by rowKey
+            sql = "select pk1 from (select pk3,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1,t.pk2 limit 10) a where pk3 > '8' group by pk1,pk2,pk3 order by pk1,pk2,pk3";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1,t.pk2 limit 10) a where sub > '8' group by pk1,pk2,sub order by pk1,pk2";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select pk3,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1 desc,t.pk2 desc limit 10) a where pk3 > '8' group by pk1, pk2, pk3 order by pk1 desc ,pk2 desc ,pk3 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select pk1 from (select substr(pk3,0,3) sub,pk2,pk1 from "+tableName+" t where v1 = '6' order by t.pk1 desc,t.pk2 desc limit 10) a where sub > '8' group by pk1,pk2,sub order by pk1 desc,pk2 desc";
+            plan =  TestUtil.getOptimizeQueryPlan(conn, sql);
+            assertTrue(plan.getGroupBy().isOrderPreserving());
+            assertTrue(plan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+        } finally {
+            if(conn != null) {
+                conn.close();
+            }
+        }
+    }
+
+    @Test
+    public void testOrderPreservingForSortMergeJoinBug5148() throws Exception {
+        doTestOrderPreservingForSortMergeJoinBug5148(false, false);
+        doTestOrderPreservingForSortMergeJoinBug5148(false, true);
+        doTestOrderPreservingForSortMergeJoinBug5148(true, false);
+        doTestOrderPreservingForSortMergeJoinBug5148(true, true);
+    }
+
+    private void doTestOrderPreservingForSortMergeJoinBug5148(boolean desc, boolean salted) throws Exception {
+        Connection conn = null;
+        try {
+            conn = DriverManager.getConnection(getUrl());
+
+            String tableName1 = generateUniqueName();
+            String tableName2 = generateUniqueName();
+
+            String sql = "CREATE TABLE IF NOT EXISTS "+tableName1+" ( "+
+                    "AID INTEGER PRIMARY KEY "+(desc ? "desc" : "")+","+
+                    "AGE INTEGER"+
+                    ") "+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+
+            sql = "CREATE TABLE IF NOT EXISTS "+tableName2+" ( "+
+                    "BID INTEGER PRIMARY KEY "+(desc ? "desc" : "")+","+
+                    "CODE INTEGER"+
+                    ")"+(salted ? "SALT_BUCKETS =4" : "");
+            conn.createStatement().execute(sql);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code order by a.aid ,a.age";
+            QueryPlan queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code order by a.aid desc,a.age desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,a.age from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code group by a.aid,a.age order by a.aid ,a.age";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,a.age from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code group by a.aid,a.age order by a.aid desc,a.age desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code order by b.bid ,b.code";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code order by b.bid desc ,b.code desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code group by b.bid, b.code order by b.bid ,b.code";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.code from (select aid,age from "+tableName1+" where age >=11 and age<=33 order by age limit 3) a inner join "+
+                    "(select bid,code from "+tableName2+" order by code limit 1) b on a.aid=b.bid and a.age = b.code group by b.bid, b.code order by b.bid desc,b.code desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+            //test part column
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by a.aid";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by a.aid desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code group by a.aid order by a.aid";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code group by a.aid order by a.aid desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ a.aid,b.code from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by a.age";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid,a.age from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by b.bid";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid,a.age from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by b.bid desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code group by b.bid order by b.bid";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy() == OrderBy.FWD_ROW_KEY_ORDER_BY);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code group by b.bid order by b.bid desc";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getGroupBy().isOrderPreserving());
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+
+            sql = "select /*+ USE_SORT_MERGE_JOIN */ b.bid,a.age from "+tableName1+" a inner join "+tableName2+" b on a.aid=b.bid and a.age = b.code order by b.code";
+            queryPlan = getQueryPlan(conn, sql);
+            assertTrue(queryPlan.getOrderBy().getOrderByExpressions().size() > 0);
+        } finally {
+            if(conn!=null) {
+                conn.close();
+            }
+        }
+    }
+
+    @Test
+    public void testSortMergeBug4508() throws Exception {
+        Connection conn = null;
+        Connection conn010 = null;
+        try {
+            // Salted tables
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            conn = DriverManager.getConnection(getUrl(), props);
+            props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            props.setProperty("TenantId", "010");
+            conn010 = DriverManager.getConnection(getUrl(), props);
+
+            String peopleTable1 = generateUniqueName();
+            String myTable1 = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + peopleTable1 + " (\n" +
+                    "PERSON_ID VARCHAR NOT NULL,\n" +
+                    "NAME VARCHAR\n" +
+                    "CONSTRAINT PK_TEST_PEOPLE PRIMARY KEY (PERSON_ID)) SALT_BUCKETS = 3");
+            conn.createStatement().execute("CREATE TABLE " + myTable1 + " (\n" +
+                    "LOCALID VARCHAR NOT NULL,\n" +
+                    "DSID VARCHAR(255) NOT NULL, \n" +
+                    "EID CHAR(40),\n" +
+                    "HAS_CANDIDATES BOOLEAN\n" +
+                    "CONSTRAINT PK_MYTABLE PRIMARY KEY (LOCALID, DSID)) SALT_BUCKETS = 3");
+            verifyQueryPlanForSortMergeBug4508(conn, peopleTable1, myTable1);
+
+            // Salted multi-tenant tables
+            String peopleTable2 = generateUniqueName();
+            String myTable2 = generateUniqueName();
+            conn.createStatement().execute("CREATE TABLE " + peopleTable2 + " (\n" +
+                    "TENANT_ID VARCHAR NOT NULL,\n" +
+                    "PERSON_ID VARCHAR NOT NULL,\n" +
+                    "NAME VARCHAR\n" +
+                    "CONSTRAINT PK_TEST_PEOPLE PRIMARY KEY (TENANT_ID, PERSON_ID))\n" +
+                    "SALT_BUCKETS = 3, MULTI_TENANT=true");
+            conn.createStatement().execute("CREATE TABLE " + myTable2 + " (\n" +
+                    "TENANT_ID VARCHAR NOT NULL,\n" +
+                    "LOCALID VARCHAR NOT NULL,\n" +
+                    "DSID VARCHAR(255) NOT NULL, \n" +
+                    "EID CHAR(40),\n" +
+                    "HAS_CANDIDATES BOOLEAN\n" +
+                    "CONSTRAINT PK_MYTABLE PRIMARY KEY (TENANT_ID, LOCALID, DSID))\n" +
+                    "SALT_BUCKETS = 3, MULTI_TENANT=true");
+            verifyQueryPlanForSortMergeBug4508(conn010, peopleTable2, myTable2);
+        } finally {
+            if(conn!=null) {
+                conn.close();
+            }
+            if(conn010 != null) {
+                conn010.close();
+            }
+        }
+    }
+
+    private static void verifyQueryPlanForSortMergeBug4508(Connection conn, String peopleTable, String myTable) throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ COUNT(*)\n" +
+                "FROM " + peopleTable + " ds JOIN " + myTable + " l\n" +
+                "ON ds.PERSON_ID = l.LOCALID\n" +
+                "WHERE l.EID IS NULL AND l.DSID = 'PEOPLE' AND l.HAS_CANDIDATES = FALSE";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN */ COUNT(*)\n" +
+                "FROM (SELECT LOCALID FROM " + myTable + "\n" +
+                "WHERE EID IS NULL AND DSID = 'PEOPLE' AND HAS_CANDIDATES = FALSE) l\n" +
+                "JOIN " + peopleTable + " ds ON ds.PERSON_ID = l.LOCALID";
+
+        for (String q : new String[]{query1, query2}) {
+            ResultSet rs = conn.createStatement().executeQuery("explain " + q);
+            String plan = QueryUtil.getExplainPlan(rs);
+            assertFalse("Tables should not require sort over their PKs:\n" + plan,
+                    plan.contains("SERVER SORTED BY"));
+        }
+    }
 }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
index 88bef47..1882207 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
@@ -57,7 +57,7 @@ public class OrderedResultIteratorTest {
         RegionScanner s = Mockito.mock(RegionScanner.class);
         Scan scan = new Scan();
         Expression exp = LiteralExpression.newConstant(Boolean.TRUE);
-        OrderByExpression ex = new OrderByExpression(exp, false, false);
+        OrderByExpression ex = OrderByExpression.createByCheckIfOrderByReverse(exp, false, false, false);
         ScanRegionObserver.serializeIntoScan(scan, 0, Arrays.asList(ex), 100);
         // Check 5.1.0 & Check > 5.1.0
         ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.1.0"));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 69aeaad..937f0a5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -498,7 +498,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest {
             public Cost getCost() {
                 return Cost.ZERO;
             }
-            
+
+            @Override
+            public List<OrderBy> getOutputOrderBys() {
+                return Collections.<OrderBy> emptyList();
+            }
         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null);
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;