You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2019/04/18 16:56:21 UTC

[asterixdb] branch master updated: [NO ISSUE][RT] Window operator performance improvement

This is an automated email from the ASF dual-hosted git repository.

dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5605582  [NO ISSUE][RT] Window operator performance improvement
5605582 is described below

commit 5605582081ec236b7e6bcc47d977f42d099095af
Author: Dmitry Lychagin <dm...@couchbase.com>
AuthorDate: Tue Apr 16 15:20:47 2019 -0700

    [NO ISSUE][RT] Window operator performance improvement
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Improve performance of window operator: save only
      last values of partitioning columns from the
      previous frame instead of the whole frame
    
    Change-Id: Ib8d6b3b99ee24c73b76fd118040ed6972e6798d9
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3346
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../app/resource/OperatorResourcesComputer.java    |  2 +-
 .../algebricks/algebricks-runtime/pom.xml          |  5 +++
 .../win/AbstractWindowNestedPlansPushRuntime.java  | 22 ++++---------
 .../operators/win/AbstractWindowPushRuntime.java   | 36 ++++++++++++----------
 .../win/WindowNestedPlansPushRuntime.java          | 17 ++++++----
 .../win/WindowNestedPlansRunningPushRuntime.java   |  8 +++--
 .../data/accessors/PointableTupleReference.java    | 21 +++++++++++++
 .../preclustered/PreclusteredGroupWriter.java      | 12 ++++++++
 8 files changed, 80 insertions(+), 43 deletions(-)

diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
index 3fffdac..6cba1b1 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/OperatorResourcesComputer.java
@@ -148,7 +148,7 @@ public class OperatorResourcesComputer {
     private long getWindowRequiredMemory(WindowOperator op) {
         WindowPOperator physOp = (WindowPOperator) op.getPhysicalOperator();
         // memory budget configuration only applies to window operators that materialize partitions (non-streaming)
-        // streaming window operators only need 2 frames: output + copy
+        // streaming window operators only need 2 frames: output + (conservative estimate) last frame partition columns
         long memorySize = physOp.isPartitionMaterialization() ? windowMemorySize : 2 * frameSize;
         return getOperatorRequiredMemory(op, memorySize);
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
index c8f7cbf..d6b2585 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/pom.xml
@@ -78,6 +78,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
index 85585a0..e16ee13 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowNestedPlansPushRuntime.java
@@ -28,11 +28,10 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
 
 /**
@@ -144,20 +143,11 @@ abstract class AbstractWindowNestedPlansPushRuntime extends WindowMaterializingP
         }
     }
 
-    static PointableTupleReference createPointables(int ln) {
-        IPointable[] pointables = new IPointable[ln];
-        for (int i = 0; i < ln; i++) {
-            pointables[i] = VoidPointable.FACTORY.createPointable();
-        }
-        return new PointableTupleReference(pointables);
-    }
-
-    static boolean allTrue(PointableTupleReference tupleRef, IBinaryBooleanInspector boolAccessor)
-            throws HyracksDataException {
-        for (int i = 0, ln = tupleRef.getFieldCount(); i < ln; i++) {
-            IPointable field = tupleRef.getField(i);
-            boolean b = boolAccessor.getBooleanValue(field.getByteArray(), field.getStartOffset(), field.getLength());
-            if (!b) {
+    static boolean allTrue(ITupleReference tupleRef, IBinaryBooleanInspector boolAccessor) throws HyracksDataException {
+        for (int i = 0, n = tupleRef.getFieldCount(); i < n; i++) {
+            boolean v = boolAccessor.getBooleanValue(tupleRef.getFieldData(i), tupleRef.getFieldStart(i),
+                    tupleRef.getFieldLength(i));
+            if (!v) {
                 return false;
             }
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
index 9cc25d0..0afe212 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/AbstractWindowPushRuntime.java
@@ -24,16 +24,17 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.IWindowAggregateEvaluator;
 import org.apache.hyracks.algebricks.runtime.operators.aggrun.AbstractRunningAggregatePushRuntime;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupWriter;
+import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
 
 public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregatePushRuntime<IWindowAggregateEvaluator> {
 
@@ -42,9 +43,9 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
     private final IBinaryComparatorFactory[] partitionComparatorFactories;
     private IBinaryComparator[] partitionComparators;
     private final IBinaryComparatorFactory[] orderComparatorFactories;
-    private IFrame copyFrame;
-    private FrameTupleAccessor copyFrameAccessor;
     private FrameTupleAccessor frameAccessor;
+    private FrameTupleReference partitionColumnsRef;
+    private PointableTupleReference partitionColumnsPrevCopy;
     private long frameId;
     private boolean inPartition;
 
@@ -60,7 +61,8 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
     }
 
     /**
-     * Number of frames reserved by this operator: {@link #frame}, {@link #copyFrame}
+     * Number of frames reserved by this operator: {@link #frame} + conservative estimate for
+     * {@link #partitionColumnsPrevCopy}
      */
     int getReservedFrameCount() {
         return 2;
@@ -78,9 +80,9 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
         super.init();
         partitionComparators = createBinaryComparators(partitionComparatorFactories);
         frameAccessor = new FrameTupleAccessor(inputRecordDesc);
-        copyFrame = new VSizeFrame(ctx);
-        copyFrameAccessor = new FrameTupleAccessor(inputRecordDesc);
-        copyFrameAccessor.reset(copyFrame.getBuffer());
+        partitionColumnsRef = new PermutingFrameTupleReference(partitionColumns);
+        partitionColumnsPrevCopy =
+                PointableTupleReference.create(partitionColumns.length, ArrayBackedValueStorage::new);
         IBinaryComparator[] orderComparators = createBinaryComparators(orderComparatorFactories);
         for (IWindowAggregateEvaluator runningAggEval : runningAggEvals) {
             runningAggEval.configure(orderComparators);
@@ -106,18 +108,19 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
         if (frameId == 0) {
             beginPartition();
         } else {
-            boolean samePartition = PreclusteredGroupWriter.sameGroup(copyFrameAccessor,
-                    copyFrameAccessor.getTupleCount() - 1, frameAccessor, 0, partitionColumns, partitionComparators);
+            partitionColumnsRef.reset(frameAccessor, 0);
+            boolean samePartition = PreclusteredGroupWriter.sameGroup(partitionColumnsPrevCopy, partitionColumnsRef,
+                    partitionComparators);
             if (!samePartition) {
                 endPartition();
                 beginPartition();
             }
         }
-        if (nTuple == 1) {
+        int tLastIndex = nTuple - 1;
+        if (tLastIndex == 0) {
             partitionChunk(frameId, buffer, 0, 0);
         } else {
             int tBeginIndex = 0;
-            int tLastIndex = nTuple - 1;
             for (int tIndex = 1; tIndex <= tLastIndex; tIndex++) {
                 boolean samePartition = PreclusteredGroupWriter.sameGroup(frameAccessor, tIndex - 1, frameAccessor,
                         tIndex, partitionColumns, partitionComparators);
@@ -131,9 +134,8 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
             partitionChunk(frameId, buffer, tBeginIndex, tLastIndex);
         }
 
-        copyFrame.resize(buffer.capacity());
-        FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
-        copyFrameAccessor.reset(copyFrame.getBuffer());
+        partitionColumnsRef.reset(frameAccessor, tLastIndex);
+        partitionColumnsPrevCopy.set(partitionColumnsRef);
         frameId++;
     }
 
@@ -167,7 +169,7 @@ public abstract class AbstractWindowPushRuntime extends AbstractRunningAggregate
         }
     }
 
-    protected static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) {
+    static IBinaryComparator[] createBinaryComparators(IBinaryComparatorFactory[] factories) {
         IBinaryComparator[] comparators = new IBinaryComparator[factories.length];
         for (int i = 0; i < factories.length; i++) {
             comparators[i] = factories[i].createBinaryComparator();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
index cb86c5f..1a57e27 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansPushRuntime.java
@@ -188,28 +188,33 @@ class WindowNestedPlansPushRuntime extends AbstractWindowNestedPlansPushRuntime
         if (frameValueExists) {
             frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
             frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
-            frameValuePointables = createPointables(frameValueEvalFactories.length);
+            frameValuePointables =
+                    PointableTupleReference.create(frameValueEvalFactories.length, VoidPointable.FACTORY);
         }
         if (frameStartExists) {
             frameStartEvals = createEvaluators(frameStartEvalFactories, ctx);
-            frameStartPointables = createPointables(frameStartEvalFactories.length);
+            frameStartPointables =
+                    PointableTupleReference.create(frameStartEvalFactories.length, VoidPointable.FACTORY);
         }
         if (frameStartValidationExists) {
             frameStartValidationEvals = createEvaluators(frameStartValidationEvalFactories, ctx);
-            frameStartValidationPointables = createPointables(frameStartValidationEvalFactories.length);
+            frameStartValidationPointables =
+                    PointableTupleReference.create(frameStartValidationEvalFactories.length, VoidPointable.FACTORY);
         }
         if (frameEndExists) {
             frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
-            frameEndPointables = createPointables(frameEndEvalFactories.length);
+            frameEndPointables = PointableTupleReference.create(frameEndEvalFactories.length, VoidPointable.FACTORY);
         }
         if (frameEndValidationExists) {
             frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
-            frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+            frameEndValidationPointables =
+                    PointableTupleReference.create(frameEndValidationEvalFactories.length, VoidPointable.FACTORY);
         }
         if (frameExcludeExists) {
             frameExcludeEvals = createEvaluators(frameExcludeEvalFactories, ctx);
             frameExcludeComparators = createBinaryComparators(frameExcludeComparatorFactories);
-            frameExcludePointables = createPointables(frameExcludeEvalFactories.length);
+            frameExcludePointables =
+                    PointableTupleReference.create(frameExcludeEvalFactories.length, VoidPointable.FACTORY);
             frameExcludePointable2 = VoidPointable.FACTORY.createPointable();
         }
         if (frameOffsetExists) {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
index 7ce9668..5ed6861 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/win/WindowNestedPlansRunningPushRuntime.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.accessors.PointableTupleReference;
@@ -117,12 +118,13 @@ final class WindowNestedPlansRunningPushRuntime extends AbstractWindowNestedPlan
         super.init();
         frameValueEvals = createEvaluators(frameValueEvalFactories, ctx);
         frameValueComparators = MultiComparator.create(frameValueComparatorFactories);
-        frameValuePointables = createPointables(frameValueEvalFactories.length);
+        frameValuePointables = PointableTupleReference.create(frameValueEvalFactories.length, VoidPointable.FACTORY);
         frameEndEvals = createEvaluators(frameEndEvalFactories, ctx);
-        frameEndPointables = createPointables(frameEndEvalFactories.length);
+        frameEndPointables = PointableTupleReference.create(frameEndEvalFactories.length, VoidPointable.FACTORY);
         if (frameEndValidationExists) {
             frameEndValidationEvals = createEvaluators(frameEndValidationEvalFactories, ctx);
-            frameEndValidationPointables = createPointables(frameEndValidationEvalFactories.length);
+            frameEndValidationPointables =
+                    PointableTupleReference.create(frameEndValidationEvalFactories.length, VoidPointable.FACTORY);
             booleanAccessor = booleanAccessorFactory.createBinaryBooleanInspector(ctx);
             nestedAggForInvalidFrame = nestedAggCreate();
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
index 1d947c1..09b1ecf 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/PointableTupleReference.java
@@ -19,7 +19,10 @@
 
 package org.apache.hyracks.dataflow.common.data.accessors;
 
+import java.util.function.Supplier;
+
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IPointableFactory;
 
 /**
  * A tuple reference implementation that holds fields in a {@link IPointable} array
@@ -55,4 +58,22 @@ public class PointableTupleReference implements ITupleReference {
     public IPointable getField(int fIdx) {
         return fields[fIdx];
     }
+
+    public void set(ITupleReference tupleRef) {
+        for (int i = 0; i < fields.length; i++) {
+            fields[i].set(tupleRef.getFieldData(i), tupleRef.getFieldStart(i), tupleRef.getFieldLength(i));
+        }
+    }
+
+    public static PointableTupleReference create(int fieldCount, Supplier<IPointable> fieldFactory) {
+        IPointable[] fields = new IPointable[fieldCount];
+        for (int i = 0; i < fieldCount; i++) {
+            fields[i] = fieldFactory.get();
+        }
+        return new PointableTupleReference(fields);
+    }
+
+    public static PointableTupleReference create(int fieldCount, IPointableFactory fieldFactory) {
+        return create(fieldCount, fieldFactory::createPointable);
+    }
 }
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index c018e9d..cb27011 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.dataflow.std.group.AggregateState;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
@@ -188,6 +189,17 @@ public class PreclusteredGroupWriter implements IFrameWriter {
         return true;
     }
 
+    public static boolean sameGroup(ITupleReference a1, ITupleReference a2, IBinaryComparator[] comparators)
+            throws HyracksDataException {
+        for (int i = 0; i < comparators.length; ++i) {
+            if (comparators[i].compare(a1.getFieldData(i), a1.getFieldStart(i), a1.getFieldLength(i),
+                    a2.getFieldData(i), a2.getFieldStart(i), a2.getFieldLength(i)) != 0) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         isFailed = true;