You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/03/13 23:19:19 UTC

[1/2] git commit: DRILL-31 WindowFrame operator

DRILL-31 WindowFrame operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b41f51f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b41f51f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b41f51f1

Branch: refs/heads/master
Commit: b41f51f1c6cd786f14f0d50066a67a76169b9d1e
Parents: cb3132a
Author: Timothy Chen <tn...@gmail.com>
Authored: Sun Feb 17 23:41:57 2013 -0800
Committer: Timothy Chen <tn...@gmail.com>
Committed: Mon Mar 4 02:22:26 2013 -0800

----------------------------------------------------------------------
 .../common/logical/data/SingleInputOperator.java   |    2 +-
 .../drill/common/logical/data/WindowFrame.java     |    4 +-
 .../org/apache/drill/exec/ref/UnbackedRecord.java  |    3 +-
 .../apache/drill/exec/ref/rops/WindowFrameROP.java |  321 +++++++++++++++
 .../apache/drill/exec/ref/rops/WindowingROP.java   |   56 ---
 .../drill/exec/ref/rops/WindowFrameROPTest.java    |  201 +++++++++
 6 files changed, 526 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
index 1310a79..0acfbef 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  * SimpleOperator is an operator that has one inputs at most.
  */
-public abstract class SingleInputOperator extends LogicalOperatorBase{
+public abstract class SingleInputOperator extends LogicalOperatorBase {
 
   private LogicalOperator input;
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
index 72a7ffd..ec14663 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index bc6ae0e..6152a32 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -77,9 +77,8 @@ public class UnbackedRecord implements RecordPointer {
 
     @Override
     public RecordPointer copy() {
-        // TODO: Make a deep copy.
         UnbackedRecord r = new UnbackedRecord();
-        r.root = this.root;
+        r.root = this.root.copy();
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
new file mode 100644
index 0000000..a8417d1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
@@ -0,0 +1,321 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues;
+
+import java.util.List;
+import java.util.Queue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class WindowFrameROP extends SingleInputROPBase<WindowFrame> {
+
+    private RecordIterator incoming;
+    private FieldReference segmentRef;
+    private FieldReference positionRef;
+    private FieldReference withinRef;
+    private boolean withinConstrained;
+
+    public WindowFrameROP(WindowFrame config) {
+        super(config);
+        WindowFrame.FrameRef frameRef = config.getFrame();
+        if (frameRef != null) {
+            positionRef = frameRef.getPosition();
+            segmentRef = frameRef.getSegment();
+        }
+
+        if (positionRef == null) {
+            positionRef = new FieldReference("ref.position");
+        }
+
+        if (segmentRef == null) {
+            segmentRef = new FieldReference("ref.segment");
+        }
+
+        withinRef = config.getWithin();
+        withinConstrained = withinRef != null;
+    }
+
+    @Override
+    protected void setInput(RecordIterator incoming) {
+        this.incoming = incoming;
+    }
+
+    @Override
+    protected RecordIterator getIteratorInternal() {
+        return new WindowIterator(config.getStart(), config.getEnd());
+    }
+
+    private class Window {
+        List<WindowObjectHolder> holders;
+        int windowId;
+        int nextRecordIndex;
+        int lastIndex;
+        int nextPosition;
+
+        private Window(long start, long end, Window lastWindow) {
+            this.holders = Lists.newArrayList();
+            if (lastWindow != null) {
+                this.windowId = lastWindow.getWindowId() + 1;
+                lastIndex = (int) (windowId + end);
+                final int lastMinIndex = (int) Math.max(windowId + start, 0);
+                Iterable<WindowObjectHolder> lastHolders = Iterables.filter(lastWindow.getHolders(), new Predicate<WindowObjectHolder>() {
+                    @Override
+                    public boolean apply(WindowObjectHolder windowObjectHolder) {
+                        return windowObjectHolder.getIndex() >= lastMinIndex;
+                    }
+                });
+                for (WindowObjectHolder holder : lastHolders) {
+                    holder.setPosition(nextPosition());
+                    addRecord(holder);
+                }
+            } else {
+                this.windowId = 0;
+                lastIndex = (int) (windowId + end);
+            }
+
+        }
+
+        private int getWindowId() {
+            return windowId;
+        }
+
+        private boolean isFull() {
+            if (holders.isEmpty()) {
+                return false;
+            }
+
+            WindowObjectHolder lastHolder = holders.get(holders.size() - 1);
+            return lastHolder.getIndex() >= lastIndex;
+        }
+
+        private void addRecord(RecordPointer pointer, int index, boolean schemaChanged) {
+            addRecord(new WindowObjectHolder(pointer, nextPosition(), index, schemaChanged));
+        }
+
+        private void addRecord(WindowObjectHolder holder) {
+            if (!isFull()) {
+                holders.add(holder);
+            } else {
+                throw new DrillRuntimeException("Adding more records into windows then configured.");
+            }
+        }
+
+        private int nextPosition() {
+            int position = nextPosition;
+            if (nextPosition == 0) {
+                nextPosition = 1;
+            } else if (nextPosition > 0) {
+                nextPosition = -nextPosition;
+            } else {
+                nextPosition = -nextPosition + 1;
+            }
+            return position;
+        }
+
+        public List<WindowObjectHolder> getHolders() {
+            return holders;
+        }
+
+        public WindowObjectHolder nextRecord() {
+            if (nextRecordIndex >= holders.size()) {
+                return null;
+            } else {
+                return holders.get(nextRecordIndex++);
+            }
+        }
+
+        public boolean isCrossedWithinBoundary(RecordPointer nextRecord) {
+            if (withinConstrained && nextRecord != null && !holders.isEmpty()) {
+                DataValue lastWithinVal = holders.get(holders.size() - 1).getPointer().getField(withinRef);
+                DataValue nextWithinVal = nextRecord.getField(withinRef);
+                boolean lastIsNull = lastWithinVal == null;
+                boolean nextIsNull = nextWithinVal == null;
+                return lastIsNull != nextIsNull || (!nextIsNull && !nextWithinVal.equals(lastWithinVal));
+            }
+
+            return false;
+        }
+
+        public void removeHoldersBeforeIndex(final int index) {
+            Iterables.removeIf(holders, new Predicate<WindowObjectHolder>() {
+                @Override
+                public boolean apply(WindowObjectHolder windowObjectHolder) {
+                    return windowObjectHolder.getIndex() <= index;
+                }
+            });
+        }
+    }
+
+    private class WindowObjectHolder {
+        private final RecordPointer pointer;
+        private int position;
+        private final int index;
+        private final boolean schemaChanged;
+        private int windowId;
+
+        private WindowObjectHolder(RecordPointer pointer, int position, int index, boolean schemaChanged) {
+            this.pointer = pointer;
+            this.position = position;
+            this.index = index;
+            this.schemaChanged = schemaChanged;
+        }
+
+        public WindowObjectHolder setWindowId(int windowId) {
+            this.windowId = windowId;
+            return this;
+        }
+
+        public RecordPointer getPointer() {
+            return pointer;
+        }
+
+        public int getPosition() {
+            return position;
+        }
+
+        public int getIndex() {
+            return index;
+        }
+
+        public boolean isSchemaChanged() {
+            return schemaChanged;
+        }
+
+        public int getWindowId() {
+            return windowId;
+        }
+
+        public void setPosition(int position) {
+            this.position = position;
+        }
+    }
+
+    private class WindowManager {
+        Queue<WindowObjectHolder> holderBuffer;
+        Window curWindow;
+        long start;
+        long end;
+
+        WindowManager(long start, long end) {
+            holderBuffer = Queues.newLinkedBlockingDeque();
+            this.start = start;
+            this.end = end;
+        }
+
+        private WindowObjectHolder nextRecord() {
+            if (curWindow != null) {
+                WindowObjectHolder holder = curWindow.nextRecord();
+                if (holder != null) {
+                    return holder.setWindowId(curWindow.getWindowId());
+                } else if (!holderBuffer.isEmpty()) {
+                    WindowObjectHolder obj = holderBuffer.poll();
+                    addRecord(obj.getPointer(), obj.getIndex(), obj.isSchemaChanged());
+                    return nextRecord();
+                }
+            }
+
+            return null;
+        }
+
+        public void addRecord(RecordPointer recordPointer, int index, boolean schemaChanged) {
+            if (curWindow == null || curWindow.isFull()) {
+                curWindow = new Window(start, end, curWindow);
+            } else if (curWindow.isCrossedWithinBoundary(recordPointer)) {
+                if (index - 1 == curWindow.getWindowId()) {
+                    curWindow.removeHoldersBeforeIndex(curWindow.getWindowId());
+                }
+                curWindow = new Window(start, end, curWindow);
+                if (curWindow.isCrossedWithinBoundary(recordPointer)) {
+                    holderBuffer.add(new WindowObjectHolder(recordPointer, 0, index, schemaChanged));
+                    return;
+                }
+            }
+
+            curWindow.addRecord(recordPointer, index, schemaChanged);
+        }
+
+        public boolean hasMoreWindows(int curIndex) {
+            return curWindow != null && curWindow.getWindowId() < curIndex;
+        }
+
+        public WindowObjectHolder nextWindowOnEmpty() {
+            curWindow = new Window(start, end, curWindow);
+            return nextRecord();
+        }
+    }
+
+
+    private class WindowIterator implements RecordIterator {
+        private int curIndex = -1;
+        private WindowManager windowManager;
+        private ProxySimpleRecord windowPointer;
+
+        public WindowIterator(long start, long end) {
+            checkArgument(end - start >= 0, "Invalid end and start. end: %s, start: %s", end, start);
+            windowManager = new WindowManager(start, end);
+            windowPointer = new ProxySimpleRecord();
+        }
+
+        @Override
+        public RecordPointer getRecordPointer() {
+            return windowPointer;
+        }
+
+        @Override
+        public NextOutcome next() {
+            WindowObjectHolder holder = windowManager.nextRecord();
+            if (holder == null) {
+                NextOutcome outcome = incoming.next();
+                if (outcome != NextOutcome.NONE_LEFT) {
+                    windowManager.addRecord(incoming.getRecordPointer().copy(), ++curIndex, outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED);
+                    holder = windowManager.nextRecord();
+                } else if (windowManager.hasMoreWindows(curIndex)) {
+                    holder = windowManager.nextWindowOnEmpty();
+                }
+
+                if (holder == null) {
+                    return NextOutcome.NONE_LEFT;
+                }
+            }
+
+            RecordPointer newPointer = holder.getPointer().copy();
+            newPointer.addField(positionRef, new ScalarValues.IntegerScalar(holder.getPosition()));
+            newPointer.addField(segmentRef, new ScalarValues.IntegerScalar(holder.getWindowId()));
+            windowPointer.setRecord(newPointer);
+            return holder.isSchemaChanged() ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+        }
+
+        @Override
+        public ROP getParent() {
+            return WindowFrameROP.this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
deleted file mode 100644
index bbfbc1c..0000000
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.drill.exec.ref.rops;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.drill.common.logical.data.SingleInputOperator;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-
-/**
- * For simplification purposes, the Windowing reference implementation takes the lazy approach of finishing a window
- * before it outputs any values from that window. While this is necessary in the ALL:ALL scenario, other scenarios could
- * be implemented more efficiently with an appropriately size open window.
- */
-public class WindowingROP extends SingleInputROPBase {
-
-  private List<RecordPointer> records = new LinkedList<RecordPointer>();
-  private WindowManager[] windows;
-  private Window[] windowPerKey;
-
-  // the place where we should start the next batch.
-  private int internalWindowPosition;
-
-  public WindowingROP(SingleInputOperator config) {
-    super(config);
-    throw new NotImplementedException();
-  }
-
-  @Override
-  protected void setInput(RecordIterator incoming) {
-  }
-
-  @Override
-  protected RecordIterator getIteratorInternal() {
-    return null;
-  }
-
-  private class Window {
-
-    int ending;
-    List<RecordPointer> records = new ArrayList<RecordPointer>();
-
-    public void reset(int curPos) {
-
-    }
-  }
-
-  private class WindowManager {
-    private void increment() {
-
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
new file mode 100644
index 0000000..d041174
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
@@ -0,0 +1,201 @@
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.TestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+
+public class WindowFrameROPTest {
+    final String input = "" +
+            "{id: 0}" +
+            "{id: 1}" +
+            "{id: 2}" +
+            "{id: 3}" +
+            "{id: 4}";
+
+    @Test
+    public void windowShouldWorkWithBefore() throws IOException {
+        WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 0L));
+        RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+        rop.setInput(incoming);
+        RecordIterator out = rop.getOutput();
+
+        List<WindowObj> windows = Lists.newArrayList(
+                new WindowObj(0, 0, 0),
+                new WindowObj(1, 0, 0),
+                new WindowObj(1, 1, 1),
+                new WindowObj(2, 0, 0),
+                new WindowObj(2, 1, 1),
+                new WindowObj(2, 2, -1),
+                new WindowObj(3, 1, 0),
+                new WindowObj(3, 2, 1),
+                new WindowObj(3, 3, -1),
+                new WindowObj(4, 2, 0),
+                new WindowObj(4, 3, 1),
+                new WindowObj(4, 4, -1)
+        );
+
+        verifyWindowOrder(windows, out);
+    }
+
+    @Test
+    public void windowShouldWorkWithAfter() throws IOException {
+        WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, 0L, 2L));
+        RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+        rop.setInput(incoming);
+        RecordIterator out = rop.getOutput();
+
+        List<WindowObj> windows = Lists.newArrayList(
+                new WindowObj(0, 0, 0),
+                new WindowObj(0, 1, 1),
+                new WindowObj(0, 2, -1),
+                new WindowObj(1, 1, 0),
+                new WindowObj(1, 2, 1),
+                new WindowObj(1, 3, -1),
+                new WindowObj(2, 2, 0),
+                new WindowObj(2, 3, 1),
+                new WindowObj(2, 4, -1),
+                new WindowObj(3, 3, 0),
+                new WindowObj(3, 4, 1),
+                new WindowObj(4, 4, 0)
+        );
+
+        verifyWindowOrder(windows, out);
+    }
+
+    @Test
+    public void windowShouldWorkWithBeforeAndAfter() throws IOException {
+        WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 2L));
+        RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+        rop.setInput(incoming);
+        RecordIterator out = rop.getOutput();
+
+        List<WindowObj> windows = Lists.newArrayList(
+                new WindowObj(0, 0, 0),
+                new WindowObj(0, 1, 1),
+                new WindowObj(0, 2, -1),
+                new WindowObj(1, 0, 0),
+                new WindowObj(1, 1, 1),
+                new WindowObj(1, 2, -1),
+                new WindowObj(1, 3, 2),
+                new WindowObj(2, 0, 0),
+                new WindowObj(2, 1, 1),
+                new WindowObj(2, 2, -1),
+                new WindowObj(2, 3, 2),
+                new WindowObj(2, 4, -2),
+                new WindowObj(3, 1, 0),
+                new WindowObj(3, 2, 1),
+                new WindowObj(3, 3, -1),
+                new WindowObj(3, 4, 2),
+                new WindowObj(4, 2, 0),
+                new WindowObj(4, 3, 1),
+                new WindowObj(4, 4, -1)
+        );
+
+        verifyWindowOrder(windows, out);
+    }
+
+    @Test
+    public void windowShouldNotCrossWithin() throws IOException {
+        String withinInput = "" +
+                "{id: 0, v: 0}" +
+                "{id: 1, v: 1}" +
+                "{id: 2, v: 2}" +
+                "{id: 3, v: 3}" +
+                "{id: 4, v: 4}";
+        WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -2L, 2L));
+        RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput);
+        rop.setInput(incoming);
+        RecordIterator out = rop.getOutput();
+
+        List<WindowObj> windows = Lists.newArrayList(
+                new WindowObj(0, 0, 0),
+                new WindowObj(1, 1, 0),
+                new WindowObj(2, 2, 0),
+                new WindowObj(3, 3, 0),
+                new WindowObj(4, 4, 0)
+        );
+
+        verifyWindowOrder(windows, out);
+    }
+
+    @Test
+    public void windowShouldNotCrossWithinAndRange() throws IOException {
+        String withinInput = "" +
+                "{id: 0, v: 0}" +
+                "{id: 1, v: 0}" +
+                "{id: 2, v: 1}" +
+                "{id: 3, v: 1}" +
+                "{id: 4, v: 2}";
+        WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -1L, 2L));
+        RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput);
+        rop.setInput(incoming);
+        RecordIterator out = rop.getOutput();
+
+        List<WindowObj> windows = Lists.newArrayList(
+                new WindowObj(0, 0, 0),
+                new WindowObj(0, 1, 1),
+                new WindowObj(1, 0, 0),
+                new WindowObj(1, 1, 1),
+                new WindowObj(2, 2, 0),
+                new WindowObj(2, 3, 1),
+                new WindowObj(3, 2, 0),
+                new WindowObj(3, 3, 1),
+                new WindowObj(4, 4, 0)
+        );
+
+        verifyWindowOrder(windows, out);
+    }
+
+    private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out) {
+        verifyWindowOrder(expectedIds, out, new SchemaPath("ref.segment"), new SchemaPath("ref.position"));
+    }
+
+    private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out, SchemaPath segment, SchemaPath position) {
+        RecordIterator.NextOutcome outcome = out.next();
+        RecordPointer pointer = out.getRecordPointer();
+        int count = 0;
+        SchemaPath id = new SchemaPath("test.id");
+        int expectedSize = expectedIds.size();
+        while (outcome != RecordIterator.NextOutcome.NONE_LEFT) {
+            count += 1;
+            WindowObj windowObj = expectedIds.get(count - 1);
+            //System.out.println(windowObj);
+            assertEquals("Id mismatch", windowObj.id, pointer.getField(id).getAsNumeric().getAsInt());
+            assertEquals("Window id mismatch", windowObj.windowId, pointer.getField(segment).getAsNumeric().getAsInt());
+            assertEquals("Position mismatch", windowObj.position, pointer.getField(position).getAsNumeric().getAsInt());
+            outcome = out.next();
+        }
+        assertEquals(expectedSize, count);
+    }
+
+    private class WindowObj {
+        int id;
+        int position;
+        int windowId;
+
+        private WindowObj(int windowId, int id, int position) {
+            this.id = id;
+            this.position = position;
+            this.windowId = windowId;
+        }
+
+        @Override
+        public String toString() {
+            return "WindowObj{" +
+                    "id=" + id +
+                    ", position=" + position +
+                    ", windowId=" + windowId +
+                    '}';
+        }
+    }
+}