You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/05/23 06:45:34 UTC

[flink] branch master updated: [FLINK-12563][table-runtime-blink] Introduce vector data format in blink

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e6ce1b  [FLINK-12563][table-runtime-blink] Introduce vector data format in blink
1e6ce1b is described below

commit 1e6ce1bb0c38be3ccf6baa5f3c7acedb86528080
Author: Jingsong Lee <lz...@aliyun.com>
AuthorDate: Thu May 23 14:45:24 2019 +0800

    [FLINK-12563][table-runtime-blink] Introduce vector data format in blink
    
    This closes #8492
---
 .../apache/flink/table/dataformat/ColumnarRow.java | 210 +++++++++++++++++++++
 .../dataformat/vector/AbstractColumnVector.java    |  62 ++++++
 .../dataformat/vector/BooleanColumnVector.java     |  26 +++
 .../table/dataformat/vector/ByteColumnVector.java  |  26 +++
 .../table/dataformat/vector/BytesColumnVector.java |  48 +++++
 .../table/dataformat/vector/ColumnVector.java      |  32 ++++
 .../flink/table/dataformat/vector/Dictionary.java  |  34 ++++
 .../dataformat/vector/DoubleColumnVector.java      |  26 +++
 .../table/dataformat/vector/FloatColumnVector.java |  26 +++
 .../table/dataformat/vector/IntColumnVector.java   |  26 +++
 .../table/dataformat/vector/LongColumnVector.java  |  26 +++
 .../table/dataformat/vector/ShortColumnVector.java |  26 +++
 .../dataformat/vector/VectorizedColumnBatch.java   | 134 +++++++++++++
 .../dataformat/vector/heap/AbstractHeapVector.java | 132 +++++++++++++
 .../dataformat/vector/heap/HeapBooleanVector.java  |  51 +++++
 .../dataformat/vector/heap/HeapByteVector.java     |  50 +++++
 .../dataformat/vector/heap/HeapBytesVector.java    | 137 ++++++++++++++
 .../dataformat/vector/heap/HeapDoubleVector.java   |  52 +++++
 .../dataformat/vector/heap/HeapFloatVector.java    |  51 +++++
 .../dataformat/vector/heap/HeapIntVector.java      |  50 +++++
 .../dataformat/vector/heap/HeapLongVector.java     |  50 +++++
 .../dataformat/vector/heap/HeapShortVector.java    |  50 +++++
 .../vector/VectorizedColumnBatchTest.java          | 190 +++++++++++++++++++
 23 files changed, 1515 insertions(+)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
new file mode 100644
index 0000000..8764c98
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ColumnarRow.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat;
+
+import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
+import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
+
+/**
+ * Columnar row to support access to vector column data. It is a row view in {@link VectorizedColumnBatch}.
+ */
+public final class ColumnarRow implements BaseRow {
+	private byte header;
+	private VectorizedColumnBatch vectorizedColumnBatch;
+	private int rowId;
+
+	public ColumnarRow() {}
+
+	public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) {
+		this(vectorizedColumnBatch, 0);
+	}
+
+	public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int rowId) {
+		this.vectorizedColumnBatch = vectorizedColumnBatch;
+		this.rowId = rowId;
+	}
+
+	public void setVectorizedColumnBatch(
+			VectorizedColumnBatch vectorizedColumnBatch) {
+		this.vectorizedColumnBatch = vectorizedColumnBatch;
+		this.rowId = 0;
+	}
+
+	public void setRowId(int rowId) {
+		this.rowId = rowId;
+	}
+
+	@Override
+	public byte getHeader() {
+		return header;
+	}
+
+	@Override
+	public void setHeader(byte header) {
+		this.header = header;
+	}
+
+	@Override
+	public int getArity() {
+		return vectorizedColumnBatch.getArity();
+	}
+
+	@Override
+	public boolean isNullAt(int ordinal) {
+		return vectorizedColumnBatch.isNullAt(rowId, ordinal);
+	}
+
+	@Override
+	public boolean getBoolean(int ordinal) {
+		return vectorizedColumnBatch.getBoolean(rowId, ordinal);
+	}
+
+	@Override
+	public byte getByte(int ordinal) {
+		return vectorizedColumnBatch.getByte(rowId, ordinal);
+	}
+
+	@Override
+	public short getShort(int ordinal) {
+		return vectorizedColumnBatch.getShort(rowId, ordinal);
+	}
+
+	@Override
+	public int getInt(int ordinal) {
+		return vectorizedColumnBatch.getInt(rowId, ordinal);
+	}
+
+	@Override
+	public long getLong(int ordinal) {
+		return vectorizedColumnBatch.getLong(rowId, ordinal);
+	}
+
+	@Override
+	public float getFloat(int ordinal) {
+		return vectorizedColumnBatch.getFloat(rowId, ordinal);
+	}
+
+	@Override
+	public double getDouble(int ordinal) {
+		return vectorizedColumnBatch.getDouble(rowId, ordinal);
+	}
+
+	@Override
+	public BinaryString getString(int ordinal) {
+		Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, ordinal);
+		return BinaryString.fromBytes(byteArray.data, byteArray.offset, byteArray.len);
+	}
+
+	@Override
+	public Decimal getDecimal(int ordinal, int precision, int scale) {
+		return vectorizedColumnBatch.getDecimal(rowId, ordinal, precision, scale);
+	}
+
+	@Override
+	public <T> BinaryGeneric<T> getGeneric(int pos) {
+		throw new UnsupportedOperationException("GenericType is not supported.");
+	}
+
+	@Override
+	public byte[] getBinary(int ordinal) {
+		Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, ordinal);
+		if (byteArray.len == byteArray.data.length) {
+			return byteArray.data;
+		} else {
+			byte[] ret = new byte[byteArray.len];
+			System.arraycopy(byteArray.data, byteArray.offset, ret, 0, byteArray.len);
+			return ret;
+		}
+	}
+
+	@Override
+	public BaseRow getRow(int ordinal, int numFields) {
+		// TODO
+		throw new UnsupportedOperationException("Row is not supported.");
+	}
+
+	@Override
+	public BinaryArray getArray(int ordinal) {
+		// TODO
+		throw new UnsupportedOperationException("Array is not supported.");
+	}
+
+	@Override
+	public BinaryMap getMap(int ordinal) {
+		// TODO
+		throw new UnsupportedOperationException("Map is not supported.");
+	}
+
+	@Override
+	public void setNullAt(int ordinal) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setBoolean(int ordinal, boolean value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setByte(int ordinal, byte value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setShort(int ordinal, short value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setInt(int ordinal, int value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setLong(int ordinal, long value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setFloat(int pos, float value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setDouble(int ordinal, double value) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public void setDecimal(int ordinal, Decimal value, int precision) {
+		throw new UnsupportedOperationException("Not support the operation!");
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		throw new UnsupportedOperationException(
+				"ColumnarRow do not support equals, please compare fields one by one!");
+	}
+
+	@Override
+	public int hashCode() {
+		throw new UnsupportedOperationException(
+				"ColumnarRow do not support hashCode, please hash fields one by one!");
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java
new file mode 100644
index 0000000..10928af
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/AbstractColumnVector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import java.io.Serializable;
+
+/**
+ * Contains the shared structure for {@link ColumnVector}s, including NULL information and dictionary.
+ * NOTE: if there are some nulls, must set {@link #noNulls} to false.
+ */
+public abstract class AbstractColumnVector implements ColumnVector, Serializable {
+
+	private static final long serialVersionUID = 5340018531388047747L;
+
+	// If the whole column vector has no nulls, this is true, otherwise false.
+	protected boolean noNulls = true;
+
+	/**
+	 * The Dictionary for this column.
+	 * If it's not null, will be used to decode the value in get().
+	 */
+	protected Dictionary dictionary;
+
+	/**
+	 * Update the dictionary.
+	 */
+	public void setDictionary(Dictionary dictionary) {
+		this.dictionary = dictionary;
+	}
+
+	/**
+	 * Reserve a integer column for ids of dictionary.
+	 * DictionaryIds maybe inconsistent with {@link #setDictionary}. Suppose a ColumnVector's data
+	 * comes from two pages. Perhaps one page uses a dictionary and the other page does not use a
+	 * dictionary. The first page that uses a field will have dictionaryIds, which requires
+	 * decoding the first page (Out batch does not support a mix of dictionary).
+	 */
+	public abstract IntColumnVector reserveDictionaryIds(int capacity);
+
+	/**
+	 * Returns true if this column has a dictionary.
+	 */
+	public boolean hasDictionary() {
+		return this.dictionary != null;
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java
new file mode 100644
index 0000000..6f2065d
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BooleanColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Boolean column vector.
+ */
+public interface BooleanColumnVector extends ColumnVector {
+	boolean getBoolean(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java
new file mode 100644
index 0000000..fae9715
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ByteColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Byte column vector.
+ */
+public interface ByteColumnVector extends ColumnVector {
+	byte getByte(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java
new file mode 100644
index 0000000..b09020b
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/BytesColumnVector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Bytes column vector to get {@link Bytes}, it include original data and offset and length.
+ * The data in {@link Bytes} maybe reuse.
+ */
+public interface BytesColumnVector extends ColumnVector {
+	Bytes getBytes(int i);
+
+	/**
+	 * Bytes data.
+	 */
+	class Bytes{
+		public final byte[] data;
+		public final int offset;
+		public final int len;
+
+		public Bytes(byte[] data, int offset, int len) {
+			this.data = data;
+			this.offset = offset;
+			this.len = len;
+		}
+
+		public byte[] getBytes() {
+			byte[] res = new byte[len];
+			System.arraycopy(data, offset, res, 0, len);
+			return res;
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java
new file mode 100644
index 0000000..ae372a2
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ColumnVector.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Nullable column vector. Access data through specific subclasses.
+ */
+public interface ColumnVector {
+
+	boolean isNullAt(int i);
+
+	/**
+	 * Resets the column to default state.
+	 */
+	void reset();
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java
new file mode 100644
index 0000000..0f7d254
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/Dictionary.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * The interface for dictionary in AbstractColumnVector to decode dictionary encoded values.
+ */
+public interface Dictionary {
+
+	int decodeToInt(int id);
+
+	long decodeToLong(int id);
+
+	float decodeToFloat(int id);
+
+	double decodeToDouble(int id);
+
+	byte[] decodeToBinary(int id);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java
new file mode 100644
index 0000000..0c1ee13
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/DoubleColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Double column vector.
+ */
+public interface DoubleColumnVector extends ColumnVector {
+	double getDouble(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java
new file mode 100644
index 0000000..6f4a561
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/FloatColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Float column vector.
+ */
+public interface FloatColumnVector extends ColumnVector {
+	float getFloat(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java
new file mode 100644
index 0000000..9325f11
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/IntColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Int column vector.
+ */
+public interface IntColumnVector extends ColumnVector {
+	int getInt(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java
new file mode 100644
index 0000000..d27e0aa
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/LongColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Long column vector.
+ */
+public interface LongColumnVector extends ColumnVector {
+	long getLong(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java
new file mode 100644
index 0000000..7fa00c4
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/ShortColumnVector.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+/**
+ * Short column vector.
+ */
+public interface ShortColumnVector extends ColumnVector {
+	short getShort(int i);
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
new file mode 100644
index 0000000..e6dc2fb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.vector.BytesColumnVector.Bytes;
+
+import java.io.Serializable;
+
+/**
+ * A VectorizedColumnBatch is a set of rows, organized with each column as a vector. It is the
+ * unit of query execution, organized to minimize the cost per row.
+ *
+ * <p>{@code VectorizedColumnBatch}s are influenced by Apache Hive VectorizedRowBatch.
+ */
+public class VectorizedColumnBatch implements Serializable {
+	private static final long serialVersionUID = 8180323238728166155L;
+
+	/**
+	 * This number is carefully chosen to minimize overhead and typically allows
+	 * one VectorizedColumnBatch to fit in cache.
+	 */
+	public static final int DEFAULT_SIZE = 2048;
+
+	private int numRows;
+	public final ColumnVector[] columns;
+
+	public VectorizedColumnBatch(ColumnVector[] vectors) {
+		this.columns = vectors;
+	}
+
+	/**
+	 * Resets the batch for writing.
+	 */
+	public void reset() {
+		for (ColumnVector column : columns) {
+			column.reset();
+		}
+		this.numRows = 0;
+	}
+
+	public void setNumRows(int numRows) {
+		this.numRows = numRows;
+	}
+
+	public int getNumRows() {
+		return numRows;
+	}
+
+	public int getArity() {
+		return columns.length;
+	}
+
+	public boolean isNullAt(int rowId, int colId) {
+		return columns[colId].isNullAt(rowId);
+	}
+
+	public boolean getBoolean(int rowId, int colId) {
+		return ((BooleanColumnVector) columns[colId]).getBoolean(rowId);
+	}
+
+	public byte getByte(int rowId, int colId) {
+		return ((ByteColumnVector) columns[colId]).getByte(rowId);
+	}
+
+	public short getShort(int rowId, int colId) {
+		return ((ShortColumnVector) columns[colId]).getShort(rowId);
+	}
+
+	public int getInt(int rowId, int colId) {
+		return ((IntColumnVector) columns[colId]).getInt(rowId);
+	}
+
+	public long getLong(int rowId, int colId) {
+		return ((LongColumnVector) columns[colId]).getLong(rowId);
+	}
+
+	public float getFloat(int rowId, int colId) {
+		return ((FloatColumnVector) columns[colId]).getFloat(rowId);
+	}
+
+	public double getDouble(int rowId, int colId) {
+		return ((DoubleColumnVector) columns[colId]).getDouble(rowId);
+	}
+
+	public Bytes getByteArray(int rowId, int colId) {
+		return ((BytesColumnVector) columns[colId]).getBytes(rowId);
+	}
+
+	private byte[] getBytes(int rowId, int colId) {
+		Bytes byteArray = getByteArray(rowId, colId);
+		if (byteArray.len == byteArray.data.length) {
+			return byteArray.data;
+		} else {
+			return byteArray.getBytes();
+		}
+	}
+
+	public String getString(int rowId, int colId) {
+		Bytes byteArray = getByteArray(rowId, colId);
+		return new String(byteArray.data, byteArray.offset, byteArray.len);
+	}
+
+	public Decimal getDecimal(int rowId, int colId, int precision, int scale) {
+		if (isNullAt(rowId, colId)) {
+			return null;
+		}
+
+		if (Decimal.is32BitDecimal(precision)) {
+			return Decimal.fromUnscaledLong(precision, scale, getInt(rowId, colId));
+		} else if (Decimal.is64BitDecimal(precision)) {
+			return Decimal.fromUnscaledLong(precision, scale, getLong(rowId, colId));
+		} else {
+			byte[] bytes = getBytes(rowId, colId);
+			return Decimal.fromUnscaledBytes(precision, scale, bytes);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
new file mode 100644
index 0000000..2af5be5
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.Decimal;
+import org.apache.flink.table.dataformat.vector.AbstractColumnVector;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.util.Arrays;
+
+/**
+ * Heap vector that nullable shared structure.
+ */
+public abstract class AbstractHeapVector extends AbstractColumnVector {
+
+	/*
+	 * If hasNulls is true, then this array contains true if the value
+	 * is null, otherwise false. The array is always allocated, so a batch can be re-used
+	 * later and nulls added.
+	 */
+	protected boolean[] isNull;
+
+	/**
+	 * Reusable column for ids of dictionary.
+	 */
+	protected HeapIntVector dictionaryIds;
+
+	public AbstractHeapVector(int len) {
+		isNull = new boolean[len];
+	}
+
+	/**
+	 * Resets the column to default state.
+	 * - fills the isNull array with false.
+	 * - sets noNulls to true.
+	 */
+	@Override
+	public void reset() {
+		if (!noNulls) {
+			Arrays.fill(isNull, false);
+		}
+		noNulls = true;
+	}
+
+	public void setNullAt(int i) {
+		isNull[i] = true;
+		noNulls = false;
+	}
+
+	@Override
+	public boolean isNullAt(int i) {
+		return !noNulls && isNull[i];
+	}
+
+	@Override
+	public HeapIntVector reserveDictionaryIds(int capacity) {
+		if (dictionaryIds == null) {
+			dictionaryIds = new HeapIntVector(capacity);
+		} else {
+			dictionaryIds.reset();
+		}
+		return dictionaryIds;
+	}
+
+	/**
+	 * Returns the underlying integer column for ids of dictionary.
+	 */
+	public HeapIntVector getDictionaryIds() {
+		return dictionaryIds;
+	}
+
+	public static AbstractHeapVector[] allocateHeapVectors(LogicalType[] fieldTypes, int maxRows) {
+		AbstractHeapVector[] columns = new AbstractHeapVector[fieldTypes.length];
+		for (int i = 0; i < fieldTypes.length; i++) {
+			columns[i] = createHeapColumn(fieldTypes[i], maxRows);
+		}
+		return columns;
+	}
+
+	public static AbstractHeapVector createHeapColumn(LogicalType fieldType, int maxRows) {
+		switch (fieldType.getTypeRoot()) {
+			case BOOLEAN:
+				return new HeapBooleanVector(maxRows);
+			case TINYINT:
+				return new HeapByteVector(maxRows);
+			case DOUBLE:
+				return new HeapDoubleVector(maxRows);
+			case FLOAT:
+				return new HeapFloatVector(maxRows);
+			case INTEGER:
+			case DATE:
+			case TIME_WITHOUT_TIME_ZONE:
+				return new HeapIntVector(maxRows);
+			case BIGINT:
+			case TIMESTAMP_WITHOUT_TIME_ZONE:
+				return new HeapLongVector(maxRows);
+			case DECIMAL:
+				DecimalType decimalType = (DecimalType) fieldType;
+				if (Decimal.is32BitDecimal(decimalType.getPrecision())) {
+					return new HeapIntVector(maxRows);
+				} else if (Decimal.is64BitDecimal(decimalType.getPrecision())) {
+					return new HeapLongVector(maxRows);
+				} else {
+					return new HeapBytesVector(maxRows);
+				}
+			case SMALLINT:
+				return new HeapShortVector(maxRows);
+			case VARCHAR:
+			case VARBINARY:
+				return new HeapBytesVector(maxRows);
+			default:
+				throw new UnsupportedOperationException(fieldType  + " is not supported now.");
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java
new file mode 100644
index 0000000..52c8f0d1
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBooleanVector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.BooleanColumnVector;
+
+/**
+ * This class represents a nullable heap boolean column vector.
+ */
+public class HeapBooleanVector extends AbstractHeapVector implements BooleanColumnVector {
+
+	private static final long serialVersionUID = 4131239076731313596L;
+
+	public boolean[] vector;
+
+	public HeapBooleanVector(int len) {
+		super(len);
+		vector = new boolean[len];
+	}
+
+	@Override
+	public HeapIntVector reserveDictionaryIds(int capacity) {
+		throw new RuntimeException("HeapBooleanVector has no dictionary.");
+	}
+
+	@Override
+	public HeapIntVector getDictionaryIds() {
+		throw new RuntimeException("HeapBooleanVector has no dictionary.");
+	}
+
+	@Override
+	public boolean getBoolean(int i) {
+		return vector[i];
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java
new file mode 100644
index 0000000..e084554
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapByteVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.ByteColumnVector;
+
+/**
+ * This class represents a nullable byte column vector.
+ */
+public class HeapByteVector extends AbstractHeapVector implements ByteColumnVector {
+
+	private static final long serialVersionUID = 7216045902943789034L;
+
+	public byte[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapByteVector(int len) {
+		super(len);
+		vector = new byte[len];
+	}
+
+	@Override
+	public byte getByte(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return (byte) dictionary.decodeToInt(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java
new file mode 100644
index 0000000..7d3a992
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapBytesVector.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.BytesColumnVector;
+
+/**
+ * This class supports string and binary data by value reference -- i.e. each field is
+ * explicitly present, as opposed to provided by a dictionary reference.
+ * In some cases, all the values will be in the same byte array to begin with,
+ * but this need not be the case. If each value is in a separate byte
+ * array to start with, or not all of the values are in the same original
+ * byte array, you can still assign data by reference into this column vector.
+ * This gives flexibility to use this in multiple situations.
+ *
+ * <p>When setting data by reference, the caller
+ * is responsible for allocating the byte arrays used to hold the data.
+ * You can also set data by value, as long as you call the initBuffer() method first.
+ * You can mix "by value" and "by reference" in the same column vector,
+ * though that use is probably not typical.
+ */
+public class HeapBytesVector extends AbstractHeapVector implements BytesColumnVector {
+
+	private static final long serialVersionUID = -8529155738773478597L;
+
+	/**
+	 * start offset of each field.
+	 */
+	public int[] start;
+
+	/**
+	 * The length of each field.
+	 */
+	public int[] length;
+
+	/**
+	 * buffer to use when actually copying in data.
+	 */
+	public byte[] buffer;
+
+	/**
+	 * Hang onto a byte array for holding smaller byte values.
+	 */
+	private int elementsAppended = 0;
+	private int capacity;
+
+	/**
+	 * Don't call this constructor except for testing purposes.
+	 *
+	 * @param size number of elements in the column vector
+	 */
+	public HeapBytesVector(int size) {
+		super(size);
+		capacity = size;
+		buffer = new byte[capacity];
+		start = new int[size];
+		length = new int[size];
+	}
+
+	@Override
+	public void reset() {
+		super.reset();
+		elementsAppended = 0;
+	}
+
+	/**
+	 * Set a field by actually copying in to a local buffer.
+	 * If you must actually copy data in to the array, use this method.
+	 * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+	 * Setting data by reference tends to run a lot faster than copying data in.
+	 *
+	 * @param elementNum index within column vector to set
+	 * @param sourceBuf  container of source data
+	 * @param start      start byte position within source
+	 * @param length     length of source byte sequence
+	 */
+	public void setVal(int elementNum, byte[] sourceBuf, int start, int length) {
+		reserve(elementsAppended + length);
+		System.arraycopy(sourceBuf, start, buffer, elementsAppended, length);
+		this.start[elementNum] = elementsAppended;
+		this.length[elementNum] = length;
+		elementsAppended += length;
+	}
+
+	/**
+	 * Set a field by actually copying in to a local buffer.
+	 * If you must actually copy data in to the array, use this method.
+	 * DO NOT USE this method unless it's not practical to set data by reference with setRef().
+	 * Setting data by reference tends to run a lot faster than copying data in.
+	 *
+	 * @param elementNum index within column vector to set
+	 * @param sourceBuf  container of source data
+	 */
+	public void setVal(int elementNum, byte[] sourceBuf) {
+		setVal(elementNum, sourceBuf, 0, sourceBuf.length);
+	}
+
+	private void reserve(int requiredCapacity) {
+		if (requiredCapacity > capacity) {
+			int newCapacity = requiredCapacity * 2;
+				try {
+					byte[] newData = new byte[newCapacity];
+					System.arraycopy(buffer, 0, newData, 0, elementsAppended);
+					buffer = newData;
+					capacity = newCapacity;
+				} catch (OutOfMemoryError outOfMemoryError) {
+					throw new UnsupportedOperationException(requiredCapacity + " cannot be satisfied.", outOfMemoryError);
+				}
+		}
+	}
+
+	@Override
+	public Bytes getBytes(int i) {
+		if (dictionary == null) {
+			return new Bytes(buffer, start[i], length[i]);
+		} else {
+			byte[] bytes = dictionary.decodeToBinary(dictionaryIds.vector[i]);
+			return new Bytes(bytes, 0, bytes.length);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java
new file mode 100644
index 0000000..9e83bf3
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapDoubleVector.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.DoubleColumnVector;
+
+/**
+ * This class represents a nullable double precision floating point column vector.
+ * This class will be used for operations on all floating point double types
+ * and as such will use a 64-bit double value to hold the biggest possible value.
+ */
+public class HeapDoubleVector extends AbstractHeapVector implements DoubleColumnVector {
+
+	private static final long serialVersionUID = 6193940154117411328L;
+
+	public double[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapDoubleVector(int len) {
+		super(len);
+		vector = new double[len];
+	}
+
+	@Override
+	public double getDouble(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return dictionary.decodeToDouble(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java
new file mode 100644
index 0000000..116f59a
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapFloatVector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.FloatColumnVector;
+
+/**
+ * This class represents a nullable double precision floating point column vector.
+ * This class will be used for operations on all floating point float types.
+ */
+public class HeapFloatVector extends AbstractHeapVector implements FloatColumnVector {
+
+	private static final long serialVersionUID = 8928878923550041110L;
+
+	public float[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapFloatVector(int len) {
+		super(len);
+		vector = new float[len];
+	}
+
+	@Override
+	public float getFloat(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return dictionary.decodeToFloat(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java
new file mode 100644
index 0000000..dfb0aeb
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapIntVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.IntColumnVector;
+
+/**
+ * This class represents a nullable int column vector.
+ */
+public class HeapIntVector extends AbstractHeapVector implements IntColumnVector {
+
+	private static final long serialVersionUID = -2749499358889718254L;
+
+	public int[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapIntVector(int len) {
+		super(len);
+		vector = new int[len];
+	}
+
+	@Override
+	public int getInt(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return dictionary.decodeToInt(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java
new file mode 100644
index 0000000..479a592
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapLongVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.LongColumnVector;
+
+/**
+ * This class represents a nullable long column vector.
+ */
+public class HeapLongVector extends AbstractHeapVector implements LongColumnVector {
+
+	private static final long serialVersionUID = 8534925169458006397L;
+
+	public long[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapLongVector(int len) {
+		super(len);
+		vector = new long[len];
+	}
+
+	@Override
+	public long getLong(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return dictionary.decodeToLong(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java
new file mode 100644
index 0000000..c4dbf83
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/HeapShortVector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector.heap;
+
+import org.apache.flink.table.dataformat.vector.ShortColumnVector;
+
+/**
+ * This class represents a nullable short column vector.
+ */
+public class HeapShortVector extends AbstractHeapVector implements ShortColumnVector {
+
+	private static final long serialVersionUID = -8278486456144676292L;
+
+	public short[] vector;
+
+	/**
+	 * Don't use this except for testing purposes.
+	 *
+	 * @param len the number of rows
+	 */
+	public HeapShortVector(int len) {
+		super(len);
+		vector = new short[len];
+	}
+
+	@Override
+	public short getShort(int i) {
+		if (dictionary == null) {
+			return vector[i];
+		} else {
+			return (short) dictionary.decodeToInt(dictionaryIds.vector[i]);
+		}
+	}
+}
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
new file mode 100644
index 0000000..049b934
--- /dev/null
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/vector/VectorizedColumnBatchTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataformat.vector;
+
+import org.apache.flink.table.dataformat.ColumnarRow;
+import org.apache.flink.table.dataformat.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapByteVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapBytesVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapFloatVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapIntVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapLongVector;
+import org.apache.flink.table.dataformat.vector.heap.HeapShortVector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link VectorizedColumnBatch}.
+ */
+public class VectorizedColumnBatchTest {
+
+	private static final int VECTOR_SIZE = 1024;
+
+	@Test
+	public void testTyped() {
+		HeapBooleanVector col0 = new HeapBooleanVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col0.vector[i] = i % 2 == 0;
+		}
+
+		HeapBytesVector col1 = new HeapBytesVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			byte[] bytes = String.valueOf(i).getBytes();
+			col1.setVal(i, bytes, 0, bytes.length);
+		}
+
+		HeapByteVector col2 = new HeapByteVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col2.vector[i] = (byte) i;
+		}
+
+		HeapDoubleVector col3 = new HeapDoubleVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col3.vector[i] = i;
+		}
+
+		HeapFloatVector col4 = new HeapFloatVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col4.vector[i] = i;
+		}
+
+		HeapIntVector col5 = new HeapIntVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col5.vector[i] = i;
+		}
+
+		HeapLongVector col6 = new HeapLongVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col6.vector[i] = i;
+		}
+
+		HeapShortVector col7 = new HeapShortVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col7.vector[i] = (short) i;
+		}
+
+		VectorizedColumnBatch batch = new VectorizedColumnBatch(
+				new ColumnVector[]{col0, col1, col2, col3, col4, col5, col6, col7});
+
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			ColumnarRow row = new ColumnarRow(batch, i);
+			assertEquals(row.getBoolean(0), i % 2 == 0);
+			assertEquals(row.getString(1).toString(), String.valueOf(i));
+			assertEquals(row.getByte(2), (byte) i);
+			assertEquals(row.getDouble(3), (double) i, 0);
+			assertEquals(row.getFloat(4), (float) i, 0);
+			assertEquals(row.getInt(5), i);
+			assertEquals(row.getLong(6), (long) i);
+			assertEquals(row.getShort(7), (short) i);
+		}
+	}
+
+	@Test
+	public void testNull() {
+		// all null
+		HeapIntVector col0 = new HeapIntVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			col0.setNullAt(i);
+		}
+
+		// some null
+		HeapIntVector col1 = new HeapIntVector(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			if (i % 2 == 0) {
+				col1.setNullAt(i);
+			} else {
+				col1.vector[i] = i;
+			}
+		}
+
+		VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[]{col0, col1});
+
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			ColumnarRow row = new ColumnarRow(batch, i);
+			assertTrue(row.isNullAt(0));
+			if (i % 2 == 0) {
+				assertTrue(row.isNullAt(1));
+			} else {
+				assertEquals(row.getInt(1), i);
+			}
+		}
+	}
+
+	@Test
+	public void testDictionary() {
+		// all null
+		HeapIntVector col = new HeapIntVector(VECTOR_SIZE);
+		int[] dict = new int[2];
+		dict[0] = 1998;
+		dict[1] = 9998;
+		col.setDictionary(new TestDictionary(dict));
+		HeapIntVector heapIntVector = col.reserveDictionaryIds(VECTOR_SIZE);
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			heapIntVector.vector[i] = i % 2 == 0 ? 0 : 1;
+		}
+
+		VectorizedColumnBatch batch = new VectorizedColumnBatch(new ColumnVector[]{col});
+
+		for (int i = 0; i < VECTOR_SIZE; i++) {
+			ColumnarRow row = new ColumnarRow(batch, i);
+			if (i % 2 == 0) {
+				assertEquals(row.getInt(0), 1998);
+			} else {
+				assertEquals(row.getInt(0), 9998);
+			}
+		}
+	}
+
+	private final class TestDictionary implements Dictionary {
+		private int[] intDictionary;
+
+		public TestDictionary(int[] dictionary) {
+			this.intDictionary = dictionary;
+		}
+
+		@Override
+		public int decodeToInt(int id) {
+			return intDictionary[id];
+		}
+
+		@Override
+		public long decodeToLong(int id) {
+			throw new UnsupportedOperationException("Dictionary encoding does not support float");
+		}
+
+		@Override
+		public float decodeToFloat(int id) {
+			throw new UnsupportedOperationException("Dictionary encoding does not support float");
+		}
+
+		@Override
+		public double decodeToDouble(int id) {
+			throw new UnsupportedOperationException("Dictionary encoding does not support double");
+		}
+
+		@Override
+		public byte[] decodeToBinary(int id) {
+			throw new UnsupportedOperationException("Dictionary encoding does not support String");
+		}
+	}
+}