You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/03/13 23:19:19 UTC
[1/2] git commit: DRILL-31 WindowFrame operator
DRILL-31 WindowFrame operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b41f51f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b41f51f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b41f51f1
Branch: refs/heads/master
Commit: b41f51f1c6cd786f14f0d50066a67a76169b9d1e
Parents: cb3132a
Author: Timothy Chen <tn...@gmail.com>
Authored: Sun Feb 17 23:41:57 2013 -0800
Committer: Timothy Chen <tn...@gmail.com>
Committed: Mon Mar 4 02:22:26 2013 -0800
----------------------------------------------------------------------
.../common/logical/data/SingleInputOperator.java | 2 +-
.../drill/common/logical/data/WindowFrame.java | 4 +-
.../org/apache/drill/exec/ref/UnbackedRecord.java | 3 +-
.../apache/drill/exec/ref/rops/WindowFrameROP.java | 321 +++++++++++++++
.../apache/drill/exec/ref/rops/WindowingROP.java | 56 ---
.../drill/exec/ref/rops/WindowFrameROPTest.java | 201 +++++++++
6 files changed, 526 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
index 1310a79..0acfbef 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/SingleInputOperator.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
* SimpleOperator is an operator that has one inputs at most.
*/
-public abstract class SingleInputOperator extends LogicalOperatorBase{
+public abstract class SingleInputOperator extends LogicalOperatorBase {
private LogicalOperator input;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
index 72a7ffd..ec14663 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/data/WindowFrame.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
index bc6ae0e..6152a32 100644
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/UnbackedRecord.java
@@ -77,9 +77,8 @@ public class UnbackedRecord implements RecordPointer {
@Override
public RecordPointer copy() {
- // TODO: Make a deep copy.
UnbackedRecord r = new UnbackedRecord();
- r.root = this.root;
+ r.root = this.root.copy();
return r;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
new file mode 100644
index 0000000..a8417d1
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowFrameROP.java
@@ -0,0 +1,321 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.values.DataValue;
+import org.apache.drill.exec.ref.values.ScalarValues;
+
+import java.util.List;
+import java.util.Queue;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class WindowFrameROP extends SingleInputROPBase<WindowFrame> {
+
+ private RecordIterator incoming;
+ private FieldReference segmentRef;
+ private FieldReference positionRef;
+ private FieldReference withinRef;
+ private boolean withinConstrained;
+
+ public WindowFrameROP(WindowFrame config) {
+ super(config);
+ WindowFrame.FrameRef frameRef = config.getFrame();
+ if (frameRef != null) {
+ positionRef = frameRef.getPosition();
+ segmentRef = frameRef.getSegment();
+ }
+
+ if (positionRef == null) {
+ positionRef = new FieldReference("ref.position");
+ }
+
+ if (segmentRef == null) {
+ segmentRef = new FieldReference("ref.segment");
+ }
+
+ withinRef = config.getWithin();
+ withinConstrained = withinRef != null;
+ }
+
+ @Override
+ protected void setInput(RecordIterator incoming) {
+ this.incoming = incoming;
+ }
+
+ @Override
+ protected RecordIterator getIteratorInternal() {
+ return new WindowIterator(config.getStart(), config.getEnd());
+ }
+
+ private class Window {
+ List<WindowObjectHolder> holders;
+ int windowId;
+ int nextRecordIndex;
+ int lastIndex;
+ int nextPosition;
+
+ private Window(long start, long end, Window lastWindow) {
+ this.holders = Lists.newArrayList();
+ if (lastWindow != null) {
+ this.windowId = lastWindow.getWindowId() + 1;
+ lastIndex = (int) (windowId + end);
+ final int lastMinIndex = (int) Math.max(windowId + start, 0);
+ Iterable<WindowObjectHolder> lastHolders = Iterables.filter(lastWindow.getHolders(), new Predicate<WindowObjectHolder>() {
+ @Override
+ public boolean apply(WindowObjectHolder windowObjectHolder) {
+ return windowObjectHolder.getIndex() >= lastMinIndex;
+ }
+ });
+ for (WindowObjectHolder holder : lastHolders) {
+ holder.setPosition(nextPosition());
+ addRecord(holder);
+ }
+ } else {
+ this.windowId = 0;
+ lastIndex = (int) (windowId + end);
+ }
+
+ }
+
+ private int getWindowId() {
+ return windowId;
+ }
+
+ private boolean isFull() {
+ if (holders.isEmpty()) {
+ return false;
+ }
+
+ WindowObjectHolder lastHolder = holders.get(holders.size() - 1);
+ return lastHolder.getIndex() >= lastIndex;
+ }
+
+ private void addRecord(RecordPointer pointer, int index, boolean schemaChanged) {
+ addRecord(new WindowObjectHolder(pointer, nextPosition(), index, schemaChanged));
+ }
+
+ private void addRecord(WindowObjectHolder holder) {
+ if (!isFull()) {
+ holders.add(holder);
+ } else {
+ throw new DrillRuntimeException("Adding more records into windows then configured.");
+ }
+ }
+
+ private int nextPosition() {
+ int position = nextPosition;
+ if (nextPosition == 0) {
+ nextPosition = 1;
+ } else if (nextPosition > 0) {
+ nextPosition = -nextPosition;
+ } else {
+ nextPosition = -nextPosition + 1;
+ }
+ return position;
+ }
+
+ public List<WindowObjectHolder> getHolders() {
+ return holders;
+ }
+
+ public WindowObjectHolder nextRecord() {
+ if (nextRecordIndex >= holders.size()) {
+ return null;
+ } else {
+ return holders.get(nextRecordIndex++);
+ }
+ }
+
+ public boolean isCrossedWithinBoundary(RecordPointer nextRecord) {
+ if (withinConstrained && nextRecord != null && !holders.isEmpty()) {
+ DataValue lastWithinVal = holders.get(holders.size() - 1).getPointer().getField(withinRef);
+ DataValue nextWithinVal = nextRecord.getField(withinRef);
+ boolean lastIsNull = lastWithinVal == null;
+ boolean nextIsNull = nextWithinVal == null;
+ return lastIsNull != nextIsNull || (!nextIsNull && !nextWithinVal.equals(lastWithinVal));
+ }
+
+ return false;
+ }
+
+ public void removeHoldersBeforeIndex(final int index) {
+ Iterables.removeIf(holders, new Predicate<WindowObjectHolder>() {
+ @Override
+ public boolean apply(WindowObjectHolder windowObjectHolder) {
+ return windowObjectHolder.getIndex() <= index;
+ }
+ });
+ }
+ }
+
+ private class WindowObjectHolder {
+ private final RecordPointer pointer;
+ private int position;
+ private final int index;
+ private final boolean schemaChanged;
+ private int windowId;
+
+ private WindowObjectHolder(RecordPointer pointer, int position, int index, boolean schemaChanged) {
+ this.pointer = pointer;
+ this.position = position;
+ this.index = index;
+ this.schemaChanged = schemaChanged;
+ }
+
+ public WindowObjectHolder setWindowId(int windowId) {
+ this.windowId = windowId;
+ return this;
+ }
+
+ public RecordPointer getPointer() {
+ return pointer;
+ }
+
+ public int getPosition() {
+ return position;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public boolean isSchemaChanged() {
+ return schemaChanged;
+ }
+
+ public int getWindowId() {
+ return windowId;
+ }
+
+ public void setPosition(int position) {
+ this.position = position;
+ }
+ }
+
+ private class WindowManager {
+ Queue<WindowObjectHolder> holderBuffer;
+ Window curWindow;
+ long start;
+ long end;
+
+ WindowManager(long start, long end) {
+ holderBuffer = Queues.newLinkedBlockingDeque();
+ this.start = start;
+ this.end = end;
+ }
+
+ private WindowObjectHolder nextRecord() {
+ if (curWindow != null) {
+ WindowObjectHolder holder = curWindow.nextRecord();
+ if (holder != null) {
+ return holder.setWindowId(curWindow.getWindowId());
+ } else if (!holderBuffer.isEmpty()) {
+ WindowObjectHolder obj = holderBuffer.poll();
+ addRecord(obj.getPointer(), obj.getIndex(), obj.isSchemaChanged());
+ return nextRecord();
+ }
+ }
+
+ return null;
+ }
+
+ public void addRecord(RecordPointer recordPointer, int index, boolean schemaChanged) {
+ if (curWindow == null || curWindow.isFull()) {
+ curWindow = new Window(start, end, curWindow);
+ } else if (curWindow.isCrossedWithinBoundary(recordPointer)) {
+ if (index - 1 == curWindow.getWindowId()) {
+ curWindow.removeHoldersBeforeIndex(curWindow.getWindowId());
+ }
+ curWindow = new Window(start, end, curWindow);
+ if (curWindow.isCrossedWithinBoundary(recordPointer)) {
+ holderBuffer.add(new WindowObjectHolder(recordPointer, 0, index, schemaChanged));
+ return;
+ }
+ }
+
+ curWindow.addRecord(recordPointer, index, schemaChanged);
+ }
+
+ public boolean hasMoreWindows(int curIndex) {
+ return curWindow != null && curWindow.getWindowId() < curIndex;
+ }
+
+ public WindowObjectHolder nextWindowOnEmpty() {
+ curWindow = new Window(start, end, curWindow);
+ return nextRecord();
+ }
+ }
+
+
+ private class WindowIterator implements RecordIterator {
+ private int curIndex = -1;
+ private WindowManager windowManager;
+ private ProxySimpleRecord windowPointer;
+
+ public WindowIterator(long start, long end) {
+ checkArgument(end - start >= 0, "Invalid end and start. end: %s, start: %s", end, start);
+ windowManager = new WindowManager(start, end);
+ windowPointer = new ProxySimpleRecord();
+ }
+
+ @Override
+ public RecordPointer getRecordPointer() {
+ return windowPointer;
+ }
+
+ @Override
+ public NextOutcome next() {
+ WindowObjectHolder holder = windowManager.nextRecord();
+ if (holder == null) {
+ NextOutcome outcome = incoming.next();
+ if (outcome != NextOutcome.NONE_LEFT) {
+ windowManager.addRecord(incoming.getRecordPointer().copy(), ++curIndex, outcome == NextOutcome.INCREMENTED_SCHEMA_CHANGED);
+ holder = windowManager.nextRecord();
+ } else if (windowManager.hasMoreWindows(curIndex)) {
+ holder = windowManager.nextWindowOnEmpty();
+ }
+
+ if (holder == null) {
+ return NextOutcome.NONE_LEFT;
+ }
+ }
+
+ RecordPointer newPointer = holder.getPointer().copy();
+ newPointer.addField(positionRef, new ScalarValues.IntegerScalar(holder.getPosition()));
+ newPointer.addField(segmentRef, new ScalarValues.IntegerScalar(holder.getWindowId()));
+ windowPointer.setRecord(newPointer);
+ return holder.isSchemaChanged() ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+ }
+
+ @Override
+ public ROP getParent() {
+ return WindowFrameROP.this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java b/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
deleted file mode 100644
index bbfbc1c..0000000
--- a/sandbox/prototype/exec/ref/src/main/java/org/apache/drill/exec/ref/rops/WindowingROP.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.drill.exec.ref.rops;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.drill.common.logical.data.SingleInputOperator;
-import org.apache.drill.exec.ref.RecordIterator;
-import org.apache.drill.exec.ref.RecordPointer;
-
-/**
- * For simplification purposes, the Windowing reference implementation takes the lazy approach of finishing a window
- * before it outputs any values from that window. While this is necessary in the ALL:ALL scenario, other scenarios could
- * be implemented more efficiently with an appropriately size open window.
- */
-public class WindowingROP extends SingleInputROPBase {
-
- private List<RecordPointer> records = new LinkedList<RecordPointer>();
- private WindowManager[] windows;
- private Window[] windowPerKey;
-
- // the place where we should start the next batch.
- private int internalWindowPosition;
-
- public WindowingROP(SingleInputOperator config) {
- super(config);
- throw new NotImplementedException();
- }
-
- @Override
- protected void setInput(RecordIterator incoming) {
- }
-
- @Override
- protected RecordIterator getIteratorInternal() {
- return null;
- }
-
- private class Window {
-
- int ending;
- List<RecordPointer> records = new ArrayList<RecordPointer>();
-
- public void reset(int curPos) {
-
- }
- }
-
- private class WindowManager {
- private void increment() {
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b41f51f1/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
new file mode 100644
index 0000000..d041174
--- /dev/null
+++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/WindowFrameROPTest.java
@@ -0,0 +1,201 @@
+package org.apache.drill.exec.ref.rops;
+
+import com.google.common.collect.Lists;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.WindowFrame;
+import org.apache.drill.exec.ref.RecordIterator;
+import org.apache.drill.exec.ref.RecordPointer;
+import org.apache.drill.exec.ref.TestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+
+public class WindowFrameROPTest {
+ final String input = "" +
+ "{id: 0}" +
+ "{id: 1}" +
+ "{id: 2}" +
+ "{id: 3}" +
+ "{id: 4}";
+
+ @Test
+ public void windowShouldWorkWithBefore() throws IOException {
+ WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 0L));
+ RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+ rop.setInput(incoming);
+ RecordIterator out = rop.getOutput();
+
+ List<WindowObj> windows = Lists.newArrayList(
+ new WindowObj(0, 0, 0),
+ new WindowObj(1, 0, 0),
+ new WindowObj(1, 1, 1),
+ new WindowObj(2, 0, 0),
+ new WindowObj(2, 1, 1),
+ new WindowObj(2, 2, -1),
+ new WindowObj(3, 1, 0),
+ new WindowObj(3, 2, 1),
+ new WindowObj(3, 3, -1),
+ new WindowObj(4, 2, 0),
+ new WindowObj(4, 3, 1),
+ new WindowObj(4, 4, -1)
+ );
+
+ verifyWindowOrder(windows, out);
+ }
+
+ @Test
+ public void windowShouldWorkWithAfter() throws IOException {
+ WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, 0L, 2L));
+ RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+ rop.setInput(incoming);
+ RecordIterator out = rop.getOutput();
+
+ List<WindowObj> windows = Lists.newArrayList(
+ new WindowObj(0, 0, 0),
+ new WindowObj(0, 1, 1),
+ new WindowObj(0, 2, -1),
+ new WindowObj(1, 1, 0),
+ new WindowObj(1, 2, 1),
+ new WindowObj(1, 3, -1),
+ new WindowObj(2, 2, 0),
+ new WindowObj(2, 3, 1),
+ new WindowObj(2, 4, -1),
+ new WindowObj(3, 3, 0),
+ new WindowObj(3, 4, 1),
+ new WindowObj(4, 4, 0)
+ );
+
+ verifyWindowOrder(windows, out);
+ }
+
+ @Test
+ public void windowShouldWorkWithBeforeAndAfter() throws IOException {
+ WindowFrameROP rop = new WindowFrameROP(new WindowFrame(null, null, -2L, 2L));
+ RecordIterator incoming = TestUtils.jsonToRecordIterator("test", input);
+ rop.setInput(incoming);
+ RecordIterator out = rop.getOutput();
+
+ List<WindowObj> windows = Lists.newArrayList(
+ new WindowObj(0, 0, 0),
+ new WindowObj(0, 1, 1),
+ new WindowObj(0, 2, -1),
+ new WindowObj(1, 0, 0),
+ new WindowObj(1, 1, 1),
+ new WindowObj(1, 2, -1),
+ new WindowObj(1, 3, 2),
+ new WindowObj(2, 0, 0),
+ new WindowObj(2, 1, 1),
+ new WindowObj(2, 2, -1),
+ new WindowObj(2, 3, 2),
+ new WindowObj(2, 4, -2),
+ new WindowObj(3, 1, 0),
+ new WindowObj(3, 2, 1),
+ new WindowObj(3, 3, -1),
+ new WindowObj(3, 4, 2),
+ new WindowObj(4, 2, 0),
+ new WindowObj(4, 3, 1),
+ new WindowObj(4, 4, -1)
+ );
+
+ verifyWindowOrder(windows, out);
+ }
+
+ @Test
+ public void windowShouldNotCrossWithin() throws IOException {
+ String withinInput = "" +
+ "{id: 0, v: 0}" +
+ "{id: 1, v: 1}" +
+ "{id: 2, v: 2}" +
+ "{id: 3, v: 3}" +
+ "{id: 4, v: 4}";
+ WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -2L, 2L));
+ RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput);
+ rop.setInput(incoming);
+ RecordIterator out = rop.getOutput();
+
+ List<WindowObj> windows = Lists.newArrayList(
+ new WindowObj(0, 0, 0),
+ new WindowObj(1, 1, 0),
+ new WindowObj(2, 2, 0),
+ new WindowObj(3, 3, 0),
+ new WindowObj(4, 4, 0)
+ );
+
+ verifyWindowOrder(windows, out);
+ }
+
+ @Test
+ public void windowShouldNotCrossWithinAndRange() throws IOException {
+ String withinInput = "" +
+ "{id: 0, v: 0}" +
+ "{id: 1, v: 0}" +
+ "{id: 2, v: 1}" +
+ "{id: 3, v: 1}" +
+ "{id: 4, v: 2}";
+ WindowFrameROP rop = new WindowFrameROP(new WindowFrame(new FieldReference("test.v"), null, -1L, 2L));
+ RecordIterator incoming = TestUtils.jsonToRecordIterator("test", withinInput);
+ rop.setInput(incoming);
+ RecordIterator out = rop.getOutput();
+
+ List<WindowObj> windows = Lists.newArrayList(
+ new WindowObj(0, 0, 0),
+ new WindowObj(0, 1, 1),
+ new WindowObj(1, 0, 0),
+ new WindowObj(1, 1, 1),
+ new WindowObj(2, 2, 0),
+ new WindowObj(2, 3, 1),
+ new WindowObj(3, 2, 0),
+ new WindowObj(3, 3, 1),
+ new WindowObj(4, 4, 0)
+ );
+
+ verifyWindowOrder(windows, out);
+ }
+
+ private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out) {
+ verifyWindowOrder(expectedIds, out, new SchemaPath("ref.segment"), new SchemaPath("ref.position"));
+ }
+
+ private void verifyWindowOrder(List<WindowObj> expectedIds, RecordIterator out, SchemaPath segment, SchemaPath position) {
+ RecordIterator.NextOutcome outcome = out.next();
+ RecordPointer pointer = out.getRecordPointer();
+ int count = 0;
+ SchemaPath id = new SchemaPath("test.id");
+ int expectedSize = expectedIds.size();
+ while (outcome != RecordIterator.NextOutcome.NONE_LEFT) {
+ count += 1;
+ WindowObj windowObj = expectedIds.get(count - 1);
+ //System.out.println(windowObj);
+ assertEquals("Id mismatch", windowObj.id, pointer.getField(id).getAsNumeric().getAsInt());
+ assertEquals("Window id mismatch", windowObj.windowId, pointer.getField(segment).getAsNumeric().getAsInt());
+ assertEquals("Position mismatch", windowObj.position, pointer.getField(position).getAsNumeric().getAsInt());
+ outcome = out.next();
+ }
+ assertEquals(expectedSize, count);
+ }
+
+ private class WindowObj {
+ int id;
+ int position;
+ int windowId;
+
+ private WindowObj(int windowId, int id, int position) {
+ this.id = id;
+ this.position = position;
+ this.windowId = windowId;
+ }
+
+ @Override
+ public String toString() {
+ return "WindowObj{" +
+ "id=" + id +
+ ", position=" + position +
+ ", windowId=" + windowId +
+ '}';
+ }
+ }
+}