You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/14 03:52:46 UTC
[06/13] Update typing system. Update RPC system. Add Fragmenting
Implementation. Working single node. Distributed failing due to threading
issues.
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
deleted file mode 100644
index 83ad599..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-/**
- * Describes a vector which holds a number of true/false values.
- */
-public class BitVector extends AbstractFixedValueVector<BitVector> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitVector.class);
-
- private final MaterializedField field;
-
- public BitVector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, 1);
- this.field = new MaterializedField(fieldId, DataType.BOOLEAN, false, ValueMode.VECTOR, this.getClass());
- }
-
- @Override
- public MaterializedField getField() {
- return field;
- }
-
-// /** Returns true or false for the specified bit index.
-// * The index should be less than the OpenBitSet size
-// */
-// public boolean get(int index) {
-// assert index >= 0 && index < this.valueCount;
-// int i = index >> 3; // div 8
-// // signed shift will keep a negative index and force an
-// // array-index-out-of-bounds-exception, removing the need for an explicit check.
-// int bit = index & 0x3f; // mod 64
-// long bitmask = 1L << bit;
-// return (data.getLong(i) & bitmask) != 0;
-// }
-
- public int getBit(int index) {
-
- assert index >= 0 && index < this.valueCount;
- int i = 8*(index >> 6); // div 8
- int bit = index & 0x3f; // mod 64
- return ((int) (data.getLong(i) >>> bit)) & 0x01;
- }
-
- /** Sets the bit at the specified index.
- * The index should be less than the OpenBitSet size.
- */
- public void set(int index) {
- assert index >= 0 && index < this.valueCount;
- int wordNum = index >> 3;
- int bit = index & 0x3f;
- long bitmask = 1L << bit;
- data.setLong(wordNum, data.getLong(wordNum) | bitmask);
- }
-
- public void clear(int index) {
- assert index >= 0 && index < this.valueCount;
- int wordNum = index >> 3;
- int bit = index & 0x03f;
- long bitmask = 1L << bit;
- data.setLong(wordNum, data.getLong(wordNum) & ~bitmask);
- }
-
-
-
- /** Clears a range of bits. Clearing past the end does not change the size of the set.
- *
- * @param startBitIndex lower index
- * @param lastBitIndex one-past the last bit to clear
- */
- private void clear2(int startBitIndex, int lastBitIndex) {
- if (lastBitIndex <= startBitIndex) return;
-
- int firstWordStart = (startBitIndex>>3);
- if (firstWordStart >= this.longWords) return;
-
- // since endIndex is one past the end, this is index of the last
- // word to be changed.
- int lastWordStart = ((lastBitIndex-1)>>3);
-
- long startmask = -1L << startBitIndex;
- long endmask = -1L >>> -lastBitIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
-
- // invert masks since we are clearing
- startmask = ~startmask;
- endmask = ~endmask;
-
- if (firstWordStart == lastWordStart) {
- data.setLong(firstWordStart, data.getLong(firstWordStart) & (startmask | endmask));
- return;
- }
- data.setLong(firstWordStart, data.getLong(firstWordStart) & startmask);
-
- int middle = Math.min(this.longWords, lastWordStart);
-
- for(int i =firstWordStart+8; i < middle; i += 8){
- data.setLong(i, 0L);
- }
- if (lastWordStart < this.longWords) {
- data.setLong(lastWordStart, data.getLong(lastWordStart) & endmask);
- }
- }
-
- public void setAllFalse(){
- clear(0, valueCount);
- }
-
-
- public void clear(int startIndex, int endIndex) {
- if (endIndex <= startIndex) return;
-
- int startWord = (startIndex >> 6);
- if (startWord >= longWords) return;
-
- // since endIndex is one past the end, this is index of the last
- // word to be changed.
- int endWord = ((endIndex - 1) >> 6);
-
- long startmask = -1L << startIndex;
- long endmask = -1L >>> -endIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
-
- // invert masks since we are clearing
- startmask = ~startmask;
- endmask = ~endmask;
-
- int startWordPos = startWord * 8;
- if (startWord == endWord) {
- data.setLong(startWordPos, data.getLong(startWordPos) & (startmask | endmask));
- return;
- }
-
- int endWordPos = endWord * 8;
-
- data.setLong(startWordPos, data.getLong(startWordPos) & startmask);
-
- int middle = Math.min(longWords, endWord)*8;
-
-
- for(int i =startWordPos+8; i < middle; i += 8){
- data.setLong(i, 0L);
- }
-
- if (endWordPos < startWordPos) {
- data.setLong(endWordPos, data.getLong(endWordPos) & endmask);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
deleted file mode 100644
index d8e1c80..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-
-public class ByteVector extends AbstractFixedValueVector<ByteVector>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteVector.class);
-
- private final MaterializedField field;
-
- public ByteVector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, 8);
- this.field = new MaterializedField(fieldId, DataType.SIGNED_BYTE, false, ValueMode.VECTOR, this.getClass());
- }
-
- @Override
- public MaterializedField getField() {
- return field;
- }
-
- public void setByte(int index, byte b){
- data.setByte(index, b);
- }
-
- public byte getByte(int index){
- return data.getByte(index);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
new file mode 100644
index 0000000..82c86d1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed1 extends AbstractFixedValueVector<Fixed1>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed1.class);
+
+ public Fixed1(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 8);
+ }
+
+ public void setByte(int index, byte b){
+ data.setByte(index, b);
+ }
+
+ public byte getByte(int index){
+ return data.getByte(index);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return getByte(index);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
new file mode 100644
index 0000000..c5f641a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed12 extends AbstractFixedValueVector<Fixed12>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed12.class);
+
+ public Fixed12(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 12*8);
+ }
+
+
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
new file mode 100644
index 0000000..649832b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed16 extends AbstractFixedValueVector<Fixed16>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed16.class);
+
+ public Fixed16(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 16*8);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
new file mode 100644
index 0000000..bd0e313
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed2 extends AbstractFixedValueVector<Fixed2>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed2.class);
+
+ public Fixed2(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 2*8);
+ }
+
+ public final void setSmallInt(int index, short value){
+ index*=2;
+ data.setShort(index, value);
+ }
+
+ public final short getSmallInt(int index){
+ index*=2;
+ return data.getShort(index);
+ }
+
+ public final void setUInt2(int index, short value){
+ setSmallInt(index, value);
+ }
+
+ public final short getUInt2(int index){
+ return getSmallInt(index);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return getSmallInt(index);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
new file mode 100644
index 0000000..650029d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed4 extends AbstractFixedValueVector<Fixed4>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed4.class);
+
+ public Fixed4(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 4*8);
+ }
+
+ public final void setInt(int index, int value){
+ index*=4;
+ data.setInt(index, value);
+ }
+
+ public final int getInt(int index){
+ index*=4;
+ return data.getInt(index);
+ }
+
+ public final void setFloat4(int index, float value){
+ index*=8;
+ data.setFloat(index, value);
+ }
+
+ public final float getFloat4(int index){
+ index*=8;
+ return data.getFloat(index);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return getInt(index);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
new file mode 100644
index 0000000..3629f5c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed8 extends AbstractFixedValueVector<Fixed8>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed8.class);
+
+ public Fixed8(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, 8*8);
+ }
+
+ public final void setBigInt(int index, long value){
+ index*=8;
+ data.setLong(index, value);
+ }
+
+ public final long getBigInt(int index){
+ index*=8;
+ return data.getLong(index);
+ }
+
+ public final void setFloat8(int index, double value){
+ index*=8;
+ data.setDouble(index, value);
+ }
+
+ public final double getFloat8(int index){
+ index*=8;
+ return data.getDouble(index);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return getBigInt(index);
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
new file mode 100644
index 0000000..594af23
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class FixedLen extends AbstractFixedValueVector<FixedLen>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FixedLen.class);
+
+
+ public FixedLen(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, field.getWidth());
+ }
+
+ public void set(ByteBuf b){
+
+ }
+
+ public void get(ByteBuf b){
+
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
deleted file mode 100644
index 779b01b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class Int16Vector extends AbstractFixedValueVector<Int16Vector>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Int16Vector.class);
-
- private final MaterializedField field;
-
- public Int16Vector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, 32);
- this.field = new MaterializedField(fieldId, DataType.INT16, false, ValueMode.VECTOR, this.getClass());
- }
-
- @Override
- public MaterializedField getField() {
- return field;
- }
-
- public final void set(int index, short value){
- index*=2;
- data.setShort(index, value);
- }
-
- public final short get(int index){
- index*=2;
- return data.getShort(index);
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
deleted file mode 100644
index d142367..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class Int32Vector extends AbstractFixedValueVector<Int32Vector>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Int32Vector.class);
-
- private final MaterializedField field;
-
- public Int32Vector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, 32);
- this.field = new MaterializedField(fieldId, DataType.INT32, false, ValueMode.VECTOR, this.getClass());
- }
-
- @Override
- public MaterializedField getField() {
- return field;
- }
-
- public final void set(int index, int value){
- index*=4;
- data.setInt(index, value);
- }
-
- public final int get(int index){
- index*=4;
- return data.getInt(index);
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
new file mode 100644
index 0000000..cc18538
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public final class NullableFixed4 extends NullableValueVector<NullableFixed4, Fixed4>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableFixed4.class);
+
+ public NullableFixed4(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator, NullableFixed4.class);
+ }
+
+ @Override
+ protected Fixed4 getNewValueVector(BufferAllocator allocator) {
+ return new Fixed4(null, allocator);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
deleted file mode 100644
index 372de13..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class NullableInt32Vector extends NullableValueVector<NullableInt32Vector, Int32Vector>{
-
- public NullableInt32Vector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, NullableInt32Vector.class);
- }
-
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableInt32Vector.class);
-
-
- public int get(int index){
- return this.value.get(index);
- }
-
- public void set(int index, int value){
- this.value.set(index, value);
- }
-
-
- @Override
- protected Int32Vector getNewValueVector(int fieldId, BufferAllocator allocator) {
- return new Int32Vector(fieldId, allocator);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
index 8e714ed..692ab87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.MaterializedField;
/**
@@ -28,18 +29,16 @@ import org.apache.drill.exec.record.MaterializedField;
abstract class NullableValueVector<T extends NullableValueVector<T, E>, E extends BaseValueVector<E>> extends BaseValueVector<T> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableValueVector.class);
- protected BitVector bits;
+ protected Bit bits;
protected E value;
- private final MaterializedField field;
- public NullableValueVector(int fieldId, BufferAllocator allocator, Class<T> valueClass) {
- super(fieldId, allocator);
- bits = new BitVector(fieldId, allocator);
- value = getNewValueVector(fieldId, allocator);
- this.field = value.getField().getNullableVersion(valueClass);
+ public NullableValueVector(MaterializedField field, BufferAllocator allocator, Class<T> valueClass) {
+ super(field, allocator);
+ bits = new Bit(null, allocator);
+ value = getNewValueVector(allocator);
}
- protected abstract E getNewValueVector(int fieldId, BufferAllocator allocator);
+ protected abstract E getNewValueVector(BufferAllocator allocator);
public int isNull(int index){
return bits.getBit(index);
@@ -76,5 +75,26 @@ abstract class NullableValueVector<T extends NullableValueVector<T, E>, E extend
}
+ @Override
+ public ByteBuf[] getBuffers() {
+ return new ByteBuf[]{bits.data, value.data};
+ }
+
+ @Override
+ public void setRecordCount(int recordCount) {
+ super.setRecordCount(recordCount);
+ bits.setRecordCount(recordCount);
+ value.setRecordCount(recordCount);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ if(isNull(index) == 0){
+ return null;
+ }else{
+ return value.getObject(index);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
new file mode 100644
index 0000000..2c08551
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class RepeatMap extends BaseValueVector<RepeatMap>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatMap.class);
+
+
+ public RepeatMap(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ }
+
+ @Override
+ protected int getAllocationSize(int valueCount) {
+ return 4 * valueCount;
+ }
+
+ @Override
+ protected void childResetAllocation(int valueCount, ByteBuf buf) {
+ }
+
+ @Override
+ protected void childCloneMetadata(RepeatMap other) {
+ }
+
+ @Override
+ protected void childClear() {
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
index e9faa93..323b55f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
@@ -20,12 +20,16 @@ package org.apache.drill.exec.record.vector;
import io.netty.buffer.ByteBufAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
-public class SelectionVector extends UInt16Vector{
+/**
+ * Convenience/Clarification Fixed2 wrapper.
+ */
+public class SelectionVector extends Fixed2{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector.class);
- public SelectionVector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator);
+ public SelectionVector(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
new file mode 100644
index 0000000..8e89c41
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
@@ -0,0 +1,250 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class TypeHelper {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
+
+ private static final int WIDTH_ESTIMATE_1 = 10;
+ private static final int WIDTH_ESTIMATE_2 = 50000;
+ private static final int WIDTH_ESTIMATE_4 = 1024*1024;
+
+ public static int getSize(MajorType major){
+ switch(major.getMinorType()){
+ case TINYINT: return 1;
+ case SMALLINT: return 2;
+ case INT: return 4;
+ case BIGINT: return 8;
+ case DECIMAL4: return 4;
+ case DECIMAL8: return 8;
+ case DECIMAL12: return 12;
+ case DECIMAL16: return 16;
+ case MONEY: return 8;
+ case DATE: return 4;
+ case TIME: return 8;
+ case TIMETZ: return 12;
+ case TIMESTAMP: return 8;
+ case DATETIME: return 8;
+ case INTERVAL: return 12;
+ case FLOAT4: return 4;
+ case FLOAT8: return 8;
+ case BOOLEAN: return 1/8;
+ case FIXEDCHAR: return major.getWidth();
+ case VARCHAR1: return 1 + WIDTH_ESTIMATE_1;
+ case VARCHAR2: return 2 + WIDTH_ESTIMATE_2;
+ case VARCHAR4: return 4 + WIDTH_ESTIMATE_4;
+ case FIXEDBINARY: return major.getWidth();
+ case VARBINARY1: return 1 + WIDTH_ESTIMATE_1;
+ case VARBINARY2: return 2 + WIDTH_ESTIMATE_2;
+ case VARBINARY4: return 4 + WIDTH_ESTIMATE_4;
+ case UINT1: return 1;
+ case UINT2: return 2;
+ case UINT4: return 4;
+ case UINT8: return 8;
+ case PROTO2: return 2 + WIDTH_ESTIMATE_2;
+ case PROTO4: return 4 + WIDTH_ESTIMATE_4;
+ case MSGPACK2: return 2 + WIDTH_ESTIMATE_2;
+ case MSGPACK4: return 4 + WIDTH_ESTIMATE_4;
+ }
+ return 4;
+ }
+
+ public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
+ switch(mode){
+ case OPTIONAL:
+ switch(type){
+ case REPEATMAP: return RepeatMap.class;
+ case TINYINT: return Fixed1.class;
+ case SMALLINT: return Fixed2.class;
+ case INT: return Fixed4.class;
+ case BIGINT: return Fixed8.class;
+ case DECIMAL4: return Fixed4.class;
+ case DECIMAL8: return Fixed8.class;
+ case DECIMAL12: return Fixed12.class;
+ case DECIMAL16: return Fixed16.class;
+ case MONEY: return Fixed8.class;
+ case DATE: return Fixed4.class;
+ case TIME: return Fixed8.class;
+ case TIMETZ: return Fixed12.class;
+ case TIMESTAMP: return Fixed8.class;
+ case DATETIME: return Fixed8.class;
+ case INTERVAL: return Fixed12.class;
+ case FLOAT4: return Fixed4.class;
+ case FLOAT8: return Fixed8.class;
+ case BOOLEAN: return Bit.class;
+ case FIXEDCHAR: return FixedLen.class;
+ case VARCHAR1: return VarLen1.class;
+ case VARCHAR2: return VarLen2.class;
+ case VARCHAR4: return VarLen4.class;
+ case FIXEDBINARY: return FixedLen.class;
+ case VARBINARY1: return VarLen1.class;
+ case VARBINARY2: return VarLen2.class;
+ case VARBINARY4: return VarLen4.class;
+ case UINT1: return Fixed1.class;
+ case UINT2: return Fixed2.class;
+ case UINT4: return Fixed4.class;
+ case UINT8: return Fixed8.class;
+ case PROTO2: return VarLen2.class;
+ case PROTO4: return VarLen4.class;
+ case MSGPACK2: return VarLen2.class;
+ case MSGPACK4: return VarLen4.class;
+ }
+ break;
+ case REQUIRED:
+ switch(type){
+// case TINYINT: return NullableFixed1.class;
+// case SMALLINT: return NullableFixed2.class;
+// case INT: return NullableFixed4.class;
+// case BIGINT: return NullableFixed8.class;
+// case DECIMAL4: return NullableFixed4.class;
+// case DECIMAL8: return NullableFixed8.class;
+// case DECIMAL12: return NullableFixed12.class;
+// case DECIMAL16: return NullableFixed16.class;
+// case MONEY: return NullableFixed8.class;
+// case DATE: return NullableFixed4.class;
+// case TIME: return NullableFixed8.class;
+// case TIMETZ: return NullableFixed12.class;
+// case TIMESTAMP: return NullableFixed8.class;
+// case DATETIME: return NullableFixed8.class;
+// case INTERVAL: return NullableFixed12.class;
+// case FLOAT4: return NullableFixed4.class;
+// case FLOAT8: return NullableFixed8.class;
+// case BOOLEAN: return NullableBit.class;
+// case FIXEDCHAR: return NullableFixedLen.class;
+// case VARCHAR1: return NullableVarLen1.class;
+// case VARCHAR2: return NullableVarLen2.class;
+// case VARCHAR4: return NullableVarLen4.class;
+// case FIXEDBINARY: return NullableFixedLen.class;
+// case VARBINARY1: return NullableVarLen1.class;
+// case VARBINARY2: return NullableVarLen2.class;
+// case VARBINARY4: return NullableVarLen4.class;
+// case UINT1: return NullableFixed1.class;
+// case UINT2: return NullableFixed2.class;
+// case UINT4: return NullableFixed4.class;
+// case UINT8: return NullableFixed8.class;
+// case PROTO2: return NullableVarLen2.class;
+// case PROTO4: return NullableVarLen4.class;
+// case MSGPACK2: return NullableVarLen2.class;
+// case MSGPACK4: return NullableVarLen4.class;
+ }
+ break;
+ case REPEATED:
+ switch(type){
+// case TINYINT: return RepeatedFixed1.class;
+// case SMALLINT: return RepeatedFixed2.class;
+// case INT: return RepeatedFixed4.class;
+// case BIGINT: return RepeatedFixed8.class;
+// case DECIMAL4: return RepeatedFixed4.class;
+// case DECIMAL8: return RepeatedFixed8.class;
+// case DECIMAL12: return RepeatedFixed12.class;
+// case DECIMAL16: return RepeatedFixed16.class;
+// case MONEY: return RepeatedFixed8.class;
+// case DATE: return RepeatedFixed4.class;
+// case TIME: return RepeatedFixed8.class;
+// case TIMETZ: return RepeatedFixed12.class;
+// case TIMESTAMP: return RepeatedFixed8.class;
+// case DATETIME: return RepeatedFixed8.class;
+// case INTERVAL: return RepeatedFixed12.class;
+// case FLOAT4: return RepeatedFixed4.class;
+// case FLOAT8: return RepeatedFixed8.class;
+// case BOOLEAN: return RepeatedBit.class;
+// case FIXEDCHAR: return RepeatedFixedLen.class;
+// case VARCHAR1: return RepeatedVarLen1.class;
+// case VARCHAR2: return RepeatedVarLen2.class;
+// case VARCHAR4: return RepeatedVarLen4.class;
+// case FIXEDBINARY: return RepeatedFixedLen.class;
+// case VARBINARY1: return RepeatedVarLen1.class;
+// case VARBINARY2: return RepeatedVarLen2.class;
+// case VARBINARY4: return RepeatedVarLen4.class;
+// case UINT1: return RepeatedFixed1.class;
+// case UINT2: return RepeatedFixed2.class;
+// case UINT4: return RepeatedFixed4.class;
+// case UINT8: return RepeatedFixed8.class;
+// case PROTO2: return RepeatedVarLen2.class;
+// case PROTO4: return RepeatedVarLen4.class;
+// case MSGPACK2: return RepeatedVarLen2.class;
+// case MSGPACK4: return RepeatedVarLen4.class;
+ }
+ break;
+ default:
+ break;
+
+ }
+ throw new UnsupportedOperationException();
+ }
+
+
+ public static ValueVector<?> getNewVector(MaterializedField field, BufferAllocator allocator){
+ MajorType type = field.getType();
+ switch(type.getMode()){
+ case REQUIRED:
+ switch(type.getMinorType()){
+ case TINYINT: return new Fixed1(field, allocator);
+ case SMALLINT: return new Fixed2(field, allocator);
+ case INT: return new Fixed4(field, allocator);
+ case BIGINT: return new Fixed8(field, allocator);
+ case DECIMAL4: return new Fixed4(field, allocator);
+ case DECIMAL8: return new Fixed8(field, allocator);
+ case DECIMAL12: return new Fixed12(field, allocator);
+ case DECIMAL16: return new Fixed16(field, allocator);
+ case MONEY: return new Fixed8(field, allocator);
+ case DATE: return new Fixed4(field, allocator);
+ case TIME: return new Fixed8(field, allocator);
+ case TIMETZ: return new Fixed12(field, allocator);
+ case TIMESTAMP: return new Fixed8(field, allocator);
+ case DATETIME: return new Fixed8(field, allocator);
+ case INTERVAL: return new Fixed12(field, allocator);
+ case FLOAT4: return new Fixed4(field, allocator);
+ case FLOAT8: return new Fixed8(field, allocator);
+ case BOOLEAN: return new Bit(field, allocator);
+ case FIXEDCHAR: return new FixedLen(field, allocator);
+ case VARCHAR1: return new VarLen1(field, allocator);
+ case VARCHAR2: return new VarLen2(field, allocator);
+ case VARCHAR4: return new VarLen4(field, allocator);
+ case FIXEDBINARY: return new FixedLen(field, allocator);
+ case VARBINARY1: return new VarLen1(field, allocator);
+ case VARBINARY2: return new VarLen2(field, allocator);
+ case VARBINARY4: return new VarLen4(field, allocator);
+ case UINT1: return new Fixed1(field, allocator);
+ case UINT2: return new Fixed2(field, allocator);
+ case UINT4: return new Fixed4(field, allocator);
+ case UINT8: return new Fixed8(field, allocator);
+ case PROTO2: return new VarLen2(field, allocator);
+ case PROTO4: return new VarLen4(field, allocator);
+ case MSGPACK2: return new VarLen2(field, allocator);
+ case MSGPACK4: return new VarLen4(field, allocator);
+ }
+ break;
+ case REPEATED:
+ break;
+ case OPTIONAL:
+ break;
+ default:
+ break;
+
+ }
+ throw new UnsupportedOperationException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
deleted file mode 100644
index 87c306b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public class UInt16Vector extends AbstractFixedValueVector<Int32Vector>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UInt16Vector.class);
-
- private final MaterializedField field;
-
- public UInt16Vector(int fieldId, BufferAllocator allocator) {
- super(fieldId, allocator, 16);
- this.field = new MaterializedField(fieldId, DataType.UINT16, false, ValueMode.VECTOR, this.getClass());
- }
-
- @Override
- public MaterializedField getField() {
- return field;
- }
-
- public final void set(int index, char value){
- index*=2;
- data.setChar(index, value);
- }
-
- public final char get(int index){
- index*=2;
- return data.getChar(index);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
index 76b0e90..8a5a822 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import java.io.Closeable;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.MaterializedField;
/**
@@ -44,6 +45,13 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
public abstract void allocateNew(int valueCount);
/**
+ * Update the value vector to the provided record information.
+ * @param metadata
+ * @param data
+ */
+ public abstract void setTo(FieldMetadata metadata, ByteBuf data);
+
+ /**
* Zero copy move of data from this vector to the target vector. Any future access to this vector without being
* populated by a new vector will cause problems.
*
@@ -52,19 +60,19 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
public abstract void transferTo(T vector);
/**
- * Return the underlying buffer. Note that this doesn't impact the reference counts for this buffer so it only should be
+ * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for this buffer so it only should be
* used for in context access. Also note that this buffer changes regularly thus external classes shouldn't hold a
- * reference to it.
+ * reference to it (unless they change it).
*
* @return The underlying ByteBuf.
*/
- public abstract ByteBuf getBuffer();
+ public abstract ByteBuf[] getBuffers();
/**
- * Returns the number of value contained within this vector.
+ * Returns the maximum number of values contained within this vector.
* @return Vector size
*/
- public abstract int size();
+ public abstract int capacity();
/**
@@ -79,4 +87,32 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
*/
public abstract MaterializedField getField();
+ /**
+ * Define the number of records that are in this value vector.
+ * @param recordCount Number of records active in this vector. Used for purposes such as getting a writable range of the data.
+ */
+ public abstract void setRecordCount(int recordCount);
+ public abstract int getRecordCount();
+
+
+ /**
+ * Get the metadata for this field.
+ * @return
+ */
+ public abstract FieldMetadata getMetadata();
+
+ /**
+ * Debug interface to get values per record.
+ * @param index The record index.
+ * @return The value in the vector.
+ */
+ public Object getObject(int index);
+
+
+ /**
+ * Useful for generating random data.
+ */
+ public void randomizeData();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
new file mode 100644
index 0000000..d87029d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen1 extends VariableVector<VarLen1, Fixed1>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen1.class);
+
+ public VarLen1(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ }
+
+ @Override
+ protected Fixed1 getNewLengthVector(BufferAllocator allocator) {
+ return new Fixed1(null, allocator);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
new file mode 100644
index 0000000..ebd440a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen2 extends VariableVector<VarLen2, Fixed2>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen2.class);
+
+ public VarLen2(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ }
+
+ @Override
+ protected Fixed2 getNewLengthVector(BufferAllocator allocator) {
+ return new Fixed2(null, allocator);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
new file mode 100644
index 0000000..b3cd712
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen4 extends VariableVector<VarLen4, Fixed4>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen4.class);
+
+ public VarLen4(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ }
+
+ @Override
+ protected Fixed4 getNewLengthVector(BufferAllocator allocator) {
+ return new Fixed4(null, allocator);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
index dd84c94..4247f14 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.record.MaterializedField;
/**
* A vector of variable length bytes. Constructed as a vector of lengths or positions and a vector of values. Random access is only possible if the variable vector stores positions as opposed to lengths.
@@ -29,18 +30,16 @@ public abstract class VariableVector<T extends VariableVector<T, E>, E extends B
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VariableVector.class);
- protected E lengthVector;
+ protected final E lengthVector;
private ByteBuf values = DeadBuf.DEAD_BUFFER;
protected int expectedValueLength;
- private final boolean hasPositions;
- public VariableVector(int fieldId, BufferAllocator allocator, boolean hasPositions) {
- super(fieldId, allocator);
- this.lengthVector = getNewLengthVector(fieldId, allocator);
- this.hasPositions = hasPositions;
+ public VariableVector(MaterializedField field, BufferAllocator allocator) {
+ super(field, allocator);
+ this.lengthVector = getNewLengthVector(allocator);
}
- protected abstract E getNewLengthVector(int fieldId, BufferAllocator allocator);
+ protected abstract E getNewLengthVector(BufferAllocator allocator);
@Override
protected int getAllocationSize(int valueCount) {
@@ -67,12 +66,28 @@ public abstract class VariableVector<T extends VariableVector<T, E>, E extends B
values.release();
values = DeadBuf.DEAD_BUFFER;
}
- }
+ }
+
- public boolean hasPositions(){
- return hasPositions;
+ @Override
+ public ByteBuf[] getBuffers() {
+ return new ByteBuf[]{lengthVector.data, values};
}
+
+ @Override
+ public void setRecordCount(int recordCount) {
+ super.setRecordCount(recordCount);
+ lengthVector.setRecordCount(recordCount);
+ }
+ public void setTotalBytes(int totalBytes){
+ values.writerIndex(totalBytes);
+ }
+
+ @Override
+ public Object getObject(int index) {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
new file mode 100644
index 0000000..859d385
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
+ ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
+
+ protected final EnumLite handshakeType;
+ protected final Parser<T> parser;
+ protected int coordinationId;
+
+ public AbstractHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
+ super();
+ this.handshakeType = handshakeType;
+ this.parser = parser;
+ }
+
+ @Override
+ public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
+ coordinationId = inbound.coordinationId;
+ ctx.channel().pipeline().remove(this);
+ if (inbound.rpcType != handshakeType.getNumber())
+ throw new RpcException(String.format("Handshake failure. Expected %s[%d] but received number [%d]",
+ handshakeType, handshakeType.getNumber(), inbound.rpcType));
+
+ T msg = parser.parseFrom(inbound.getProtobufBodyAsIS());
+ consumeHandshake(ctx.channel(), msg);
+
+ }
+
+ protected abstract void consumeHandshake(Channel c, T msg) throws Exception;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
new file mode 100644
index 0000000..a241880
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+
+public class Acks {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Acks.class);
+
+ public static final Ack OK = Ack.newBuilder().setOk(true).build();
+ public static final Ack FAIL = Ack.newBuilder().setOk(false).build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index c62d445..0ff2b9d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -18,23 +18,34 @@
package org.apache.drill.exec.rpc;
import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
-public abstract class BasicClient<T extends EnumLite> extends RpcBus<T> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
private Bootstrap b;
private volatile boolean connect = false;
+ protected R connection;
+ private EventLoopGroup eventLoop;
- public BasicClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ super(rpcMapping);
+ this.eventLoop = eventLoopGroup;
+
b = new Bootstrap() //
.group(eventLoopGroup) //
.channel(NioSocketChannel.class) //
@@ -42,40 +53,132 @@ public abstract class BasicClient<T extends EnumLite> extends RpcBus<T> {
.option(ChannelOption.SO_RCVBUF, 1 << 17) //
.option(ChannelOption.SO_SNDBUF, 1 << 17) //
.handler(new ChannelInitializer<SocketChannel>() {
-
+
@Override
protected void initChannel(SocketChannel ch) throws Exception {
- ch.closeFuture().addListener(getCloseHandler(ch));
-
+ logger.debug("initializing client connection.");
+ connection = initRemoteConnection(ch);
+ ch.closeFuture().addListener(getCloseHandler(connection));
+
ch.pipeline().addLast( //
new ZeroCopyProtobufLengthDecoder(), //
- new RpcDecoder(), //
- new RpcEncoder(), //
- new InboundHandler(ch), //
+ new RpcDecoder(rpcConfig.getName()), //
+ new RpcEncoder(rpcConfig.getName()), //
+ getHandshakeHandler(), //
+ new InboundHandler(connection), //
new RpcExceptionHandler() //
);
- channel = ch;
connect = true;
}
}) //
-
- ;
+
+ ;
+ }
+
+ protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
+
+ protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
+ private Class<T> responseType;
+
+ public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
+ super(handshakeType, parser);
+ this.responseType = responseType;
+ }
+
+ @Override
+ protected final void consumeHandshake(Channel c, T msg) throws Exception {
+ validateHandshake(msg);
+ queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
+ }
+
+ protected abstract void validateHandshake(T msg) throws Exception;
+
+ }
+
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
+ return new ChannelClosedHandler();
+ }
+
+ protected final <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFutureImpl<RECEIVE> send(
+ T connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ throw new UnsupportedOperationException(
+ "This shouldn't be used in client mode as a client only has a single connection.");
+ }
+
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
+ SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+ return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
}
@Override
public boolean isClient() {
return true;
}
-
- public ChannelFuture connectAsClient(String host, int port) throws InterruptedException {
- ChannelFuture f = b.connect(host, port).sync();
- connect = !connect;
- return f;
+
+ /**
+ * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom. Should be cleaned up.
+ */
+ private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
+ final SettableFuture<RECEIVE> future;
+ T handshakeType;
+ SEND handshakeValue;
+ String host;
+ int port;
+ Class<RECEIVE> responseClass;
+
+ public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
+ super();
+ assert host != null && !host.isEmpty();
+ assert port > 0;
+ logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
+ this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
+ future = SettableFuture.create();
+ this.handshakeType = handshakeType;
+ this.handshakeValue = handshakeValue;
+ this.host = host;
+ this.port = port;
+ this.responseClass = responseClass;
+ }
+
+ @Override
+ public void run() {
+ try {
+ logger.debug("Starting to get client connection on host {}, port {}.", host, port);
+
+ ChannelFuture f = b.connect(host, port);
+ f.sync();
+ if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
+ connect = !connect;
+ logger.debug("Client connected, sending handshake.");
+ DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
+ future.set(fut.checkedGet());
+ logger.debug("Got bit client connection.");
+ } catch (Exception e) {
+ logger.debug("Failed to get client connection.", e);
+ future.setException(e);
+ }
+ }
+
+ }
+
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
+ SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
+ RpcException {
+
+
+ HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
+ ht.start();
+ try{
+ return ht.future.get();
+ }catch(Exception e){
+ throw new RpcException(e);
+ }
+
}
public void close() {
logger.debug("Closing client");
- b.shutdown();
+ connection.getChannel().close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
new file mode 100644
index 0000000..0e62f14
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+
+import com.google.protobuf.Internal.EnumLite;
+
+public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
+
+ public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+ super(rpcMapping, alloc, eventLoopGroup);
+ }
+
+ @Override
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
+ return getCloseHandler(clientConnection.getChannel());
+ }
+
+ @Override
+ protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ return handle(rpcType, pBody, dBody);
+ }
+
+ protected abstract Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
+
+ @Override
+ public ServerConnection initRemoteConnection(Channel channel) {
+ return new ServerConnection(channel);
+ }
+
+ public static class ServerConnection extends RemoteConnection{
+
+ public ServerConnection(Channel channel) {
+ super(channel);
+ }
+
+ }
+
+
+}