You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/11/02 18:43:02 UTC

[1/2] phoenix git commit: PHOENIX-3434 Avoid deserialization of ServerAggregators in ClientAggregatePlan to improve performance

Repository: phoenix
Updated Branches:
  refs/heads/master d7b821a7b -> 268c395e6


PHOENIX-3434 Avoid deserialization of ServerAggregators in ClientAggregatePlan to improve performance


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/268c395e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/268c395e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/268c395e

Branch: refs/heads/master
Commit: 268c395e63df2ee6b32fd23cfcb365439b2bfb93
Parents: 1841947
Author: James Taylor <ja...@apache.org>
Authored: Wed Nov 2 11:42:41 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Nov 2 11:42:52 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/ClientAggregatePlan.java | 10 ++++------
 .../phoenix/expression/aggregator/Aggregators.java  |  5 ++++-
 .../expression/aggregator/ServerAggregators.java    | 16 ++++++++++++++--
 3 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/268c395e/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 9251724..efe617e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -34,10 +34,10 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.iterate.AggregatingResultIterator;
 import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator;
@@ -68,18 +68,16 @@ import com.google.common.collect.Lists;
 public class ClientAggregatePlan extends ClientProcessingPlan {
     private final GroupBy groupBy;
     private final Expression having;
-    private final Aggregators serverAggregators;
-    private final Aggregators clientAggregators;
+    private final ServerAggregators serverAggregators;
+    private final ClientAggregators clientAggregators;
     
     public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
             Integer limit, Integer offset, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) {
         super(context, statement, table, projector, limit, offset, where, orderBy, delegate);
         this.groupBy = groupBy;
         this.having = having;
-        this.serverAggregators =
-                ServerAggregators.deserialize(context.getScan()
-                        .getAttribute(BaseScannerRegionObserver.AGGREGATORS), QueryServicesOptions.withDefaults().getConfiguration());
         this.clientAggregators = context.getAggregationManager().getAggregators();
+        this.serverAggregators = ServerAggregators.newServerAggregators(this.clientAggregators);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/268c395e/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
index cf77c8e..b1dc658 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.expression.aggregator;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
@@ -58,6 +57,10 @@ abstract public class Aggregators {
         return schema;
     }
     
+    public int getMinNullableIndex() {
+        return schema.getMinNullable();
+    }
+    
     @Override
     public String toString() {
         StringBuilder buf = new StringBuilder(this.getClass().getName() + " [" + functions.length + "]:");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/268c395e/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
index 01ca733..56ffba8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -37,8 +37,6 @@ import org.apache.phoenix.schema.tuple.Tuple;
  * 
  * Aggregators that execute on the server-side
  *
- * 
- * @since 0.1
  */
 public class ServerAggregators extends Aggregators {
     public static final ServerAggregators EMPTY_AGGREGATORS = new ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new Expression[0], 0);
@@ -104,6 +102,20 @@ public class ServerAggregators extends Aggregators {
         return aggregators;
     }
 
+    public static ServerAggregators newServerAggregators(ClientAggregators clientAggregators) {
+        int minNullableIndex = clientAggregators.getMinNullableIndex();
+        SingleAggregateFunction[] functions = clientAggregators.getFunctions();
+        int len = functions.length;
+        Aggregator[] aggregators = new Aggregator[len];
+        Expression[] expressions = new Expression[len];
+        for (int i = 0; i < len; i++) {
+            SingleAggregateFunction aggFunc = functions[i];
+            aggregators[i] = aggFunc.getAggregator();
+            expressions[i] = aggFunc.getAggregatorExpression();
+        }
+        return new ServerAggregators(functions, aggregators,expressions, minNullableIndex);
+    }
+    
     /**
      * Deserialize aggregators from the serialized byte array representation
      * @param b byte array representation of a list of Aggregators


[2/2] phoenix git commit: PHOENIX-3208 MutationState.toMutations method would throw a exception if multiple tables are upserted (chenglei)

Posted by ja...@apache.org.
PHOENIX-3208 MutationState.toMutations method would throw a exception if multiple tables are upserted (chenglei)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18419477
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18419477
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18419477

Branch: refs/heads/master
Commit: 184194775aa1aa79a94cc93610667bd6124f059c
Parents: d7b821a
Author: James Taylor <ja...@apache.org>
Authored: Wed Nov 2 09:59:06 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Wed Nov 2 11:42:52 2016 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   |  3 +-
 .../phoenix/execute/MutationStateTest.java      | 75 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/18419477/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index d04a79b..31ab7c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -706,7 +706,7 @@ public class MutationState implements SQLCloseable {
                     }
 
                     @Override
-                    public Pair<byte[], List<Mutation>> next() {
+                     public Pair<byte[], List<Mutation>> next() {
                         Pair<PName, List<Mutation>> pair = mutationIterator.next();
                         return new Pair<byte[], List<Mutation>>(pair.getFirst().getBytes(), pair.getSecond());
                     }
@@ -727,6 +727,7 @@ public class MutationState implements SQLCloseable {
             public Pair<byte[], List<Mutation>> next() {
                 if (!innerIterator.hasNext()) {
                     current = iterator.next();
+                    innerIterator=init();
                 }
                 return innerIterator.next();
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/18419477/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 4c596ad..276d946 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -20,7 +20,20 @@ package org.apache.phoenix.execute;
 import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.schema.types.PUnsignedInt;
+import org.apache.phoenix.schema.types.PVarchar;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
 
 public class MutationStateTest {
@@ -59,4 +72,66 @@ public class MutationStateTest {
         assertEquals(4, result.length);
         assertArrayEquals(new int[] {1,2,3,4}, result);
     }
+
+    private static String getUrl() {
+        return PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + PhoenixRuntime.CONNECTIONLESS;
+    }
+
+    @Test
+    public void testToMutationsOverMultipleTables() throws Exception {
+        Connection conn = null;
+        try {
+            conn=DriverManager.getConnection(getUrl());
+            conn.createStatement().execute(
+                    "create table MUTATION_TEST1"+
+                            "( id1 UNSIGNED_INT not null primary key,"+
+                    "appId1 VARCHAR)");
+            conn.createStatement().execute(
+                    "create table MUTATION_TEST2"+
+                            "( id2 UNSIGNED_INT not null primary key,"+
+                    "appId2 VARCHAR)");
+
+            conn.createStatement().execute("upsert into MUTATION_TEST1(id1,appId1) values(111,'app1')");
+            conn.createStatement().execute("upsert into MUTATION_TEST2(id2,appId2) values(222,'app2')");
+
+
+            Iterator<Pair<byte[],List<KeyValue>>> dataTableNameAndMutationKeyValuesIter =
+                    PhoenixRuntime.getUncommittedDataIterator(conn);
+
+
+            assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext());
+            Pair<byte[],List<KeyValue>> pair=dataTableNameAndMutationKeyValuesIter.next();
+            String tableName1=Bytes.toString(pair.getFirst());
+            List<KeyValue> keyValues1=pair.getSecond();
+
+            assertTrue(dataTableNameAndMutationKeyValuesIter.hasNext());
+            pair=dataTableNameAndMutationKeyValuesIter.next();
+            String tableName2=Bytes.toString(pair.getFirst());
+            List<KeyValue> keyValues2=pair.getSecond();
+
+            if("MUTATION_TEST1".equals(tableName1)) {
+                assertTable(tableName1, keyValues1, tableName2, keyValues2);
+            }
+            else {
+                assertTable(tableName2, keyValues2, tableName1, keyValues1);
+            }
+            assertTrue(!dataTableNameAndMutationKeyValuesIter.hasNext());
+        }
+        finally {
+            if(conn!=null) {
+                conn.close();
+            }
+        }
+    }
+
+    private void assertTable(String tableName1,List<KeyValue> keyValues1,String tableName2,List<KeyValue> keyValues2) {
+        assertTrue("MUTATION_TEST1".equals(tableName1));
+        assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(111),CellUtil.cloneRow(keyValues1.get(0))));
+        assertTrue("app1".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues1.get(0)))));
+
+        assertTrue("MUTATION_TEST2".equals(tableName2));
+        assertTrue(Bytes.equals(PUnsignedInt.INSTANCE.toBytes(222),CellUtil.cloneRow(keyValues2.get(0))));
+        assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(0)))));
+
+    }
 }