You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/09/07 19:27:35 UTC

[incubator-pulsar] branch master updated: support nested fields in Pulsar presto connector (#2515)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2023b48  support nested fields in Pulsar presto connector (#2515)
2023b48 is described below

commit 2023b489ade528e622c0f203e5da7d377ec2772a
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Sep 7 12:27:33 2018 -0700

    support nested fields in Pulsar presto connector (#2515)
    
    * support nested fields in Pulsar presto connector
    
    * cleaning up
    
    * removing debug messages
---
 .../org/apache/pulsar/io/twitter/TweetData.java    |   7 -
 .../pulsar/sql/presto/AvroSchemaHandler.java       |  18 +-
 .../pulsar/sql/presto/JSONSchemaHandler.java       |  14 +-
 .../pulsar/sql/presto/PulsarColumnHandle.java      |  81 +--
 .../pulsar/sql/presto/PulsarColumnMetadata.java    |  50 +-
 .../pulsar/sql/presto/PulsarInternalColumn.java    |   4 +-
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  77 ++-
 .../pulsar/sql/presto/PulsarRecordCursor.java      |   7 -
 .../pulsar/sql/presto/TestPulsarConnector.java     | 625 +++++++++++++++------
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  56 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |  90 ++-
 11 files changed, 689 insertions(+), 340 deletions(-)

diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 36d4dc8..e5cb79c 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -82,13 +82,6 @@ public class TweetData {
         private Boolean defaultProfileImage;
     }
     @Data
-    public static class Url {
-        private String url;
-        private String expandedUrl;
-        private String displayUrl;
-        private List<Long> indices = null;
-    }
-    @Data
     public static class RetweetedStatus {
         private String createdAt;
         private Long id;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index f8cec40..402a36e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -55,9 +55,23 @@ public class AvroSchemaHandler implements SchemaHandler {
     public Object extractField(int index, Object currentRecord) {
         try {
             GenericRecord record = (GenericRecord) currentRecord;
-            return record.get(this.columnHandles.get(index).getPositionIndex());
+            PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
+            Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
+            Object curr = record.get(positionIndices[0]);
+            if (curr == null) {
+                return null;
+            }
+            if (positionIndices.length > 0) {
+                for (int i = 1 ; i < positionIndices.length; i++) {
+                    curr = ((GenericRecord) curr).get(positionIndices[i]);
+                    if (curr == null) {
+                        return null;
+                    }
+                }
+            }
+            return curr;
         } catch (Exception ex) {
-            log.error(ex);
+            log.debug(ex,"%s", ex);
         }
         return null;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index f72ad0e..aed7e0a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.shade.com.google.gson.JsonElement;
 import org.apache.pulsar.shade.com.google.gson.JsonObject;
 import org.apache.pulsar.shade.com.google.gson.JsonParser;
 
+import java.util.Arrays;
 import java.util.List;
 
 import static com.facebook.presto.spi.type.IntegerType.INTEGER;
@@ -54,10 +55,19 @@ public class JSONSchemaHandler implements SchemaHandler {
         try {
             JsonObject jsonObject = (JsonObject) currentRecord;
             PulsarColumnHandle pulsarColumnHandle = columnHandles.get(index);
-            JsonElement field = jsonObject.get(pulsarColumnHandle.getName());
+
+            String[] fieldNames = pulsarColumnHandle.getFieldNames();
+            JsonElement field = jsonObject.get(fieldNames[0]);
             if (field.isJsonNull()) {
                 return null;
             }
+            for (int i = 1; i < fieldNames.length ; i++) {
+                field = field.getAsJsonObject().get(fieldNames[i]);
+                if (field.isJsonNull()) {
+                    return null;
+                }
+            }
+
             Type type = pulsarColumnHandle.getType();
             Class<?> javaType = type.getJavaType();
 
@@ -81,7 +91,7 @@ public class JSONSchemaHandler implements SchemaHandler {
                 return null;
             }
         } catch (Exception ex) {
-            log.error(ex);
+            log.debug(ex,"%s", ex);
         }
         return null;
     }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index def8cc3..f98e864 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -24,9 +24,8 @@ import com.facebook.presto.spi.type.Type;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.Objects;
+import java.util.Arrays;
 
-import static com.google.common.base.MoreObjects.toStringHelper;
 import static java.util.Objects.requireNonNull;
 
 public class PulsarColumnHandle implements ColumnHandle {
@@ -53,10 +52,9 @@ public class PulsarColumnHandle implements ColumnHandle {
      */
     private final boolean internal;
 
-    /**
-     * The index of the field in the schema associated with this column.
-     */
-    private Integer positionIndex;
+    private final String[] fieldNames;
+
+    private final Integer[] positionIndices;
 
     @JsonCreator
     public PulsarColumnHandle(
@@ -65,13 +63,15 @@ public class PulsarColumnHandle implements ColumnHandle {
             @JsonProperty("type") Type type,
             @JsonProperty("hidden") boolean hidden,
             @JsonProperty("internal") boolean internal,
-            @JsonProperty("positionIndex") Integer positionIndex) {
+            @JsonProperty("fieldNames") String[] fieldNames,
+            @JsonProperty("positionIndices") Integer[] positionIndices) {
         this.connectorId = requireNonNull(connectorId, "connectorId is null");
         this.name = requireNonNull(name, "name is null");
         this.type = requireNonNull(type, "type is null");
         this.hidden = hidden;
         this.internal = internal;
-        this.positionIndex = positionIndex;
+        this.fieldNames = fieldNames;
+        this.positionIndices = positionIndices;
     }
 
     @JsonProperty
@@ -100,8 +100,13 @@ public class PulsarColumnHandle implements ColumnHandle {
     }
 
     @JsonProperty
-    public Integer getPositionIndex() {
-        return positionIndex;
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    @JsonProperty
+    public Integer[] getPositionIndices() {
+        return positionIndices;
     }
 
 
@@ -110,37 +115,43 @@ public class PulsarColumnHandle implements ColumnHandle {
     }
 
     @Override
-    public int hashCode() {
-        return Objects.hash(connectorId, name, type, hidden, internal);
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        PulsarColumnHandle that = (PulsarColumnHandle) o;
+
+        if (hidden != that.hidden) return false;
+        if (internal != that.internal) return false;
+        if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
+        if (name != null ? !name.equals(that.name) : that.name != null) return false;
+        if (type != null ? !type.equals(that.type) : that.type != null) return false;
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
     @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-
-        PulsarColumnHandle other = (PulsarColumnHandle) obj;
-        return Objects.equals(this.connectorId, other.connectorId) &&
-                Objects.equals(this.name, other.name) &&
-                Objects.equals(this.type, other.type) &&
-                Objects.equals(this.hidden, other.hidden) &&
-                Objects.equals(this.internal, other.internal) &&
-                Objects.equals(this.positionIndex, other.positionIndex);
+    public int hashCode() {
+        int result = connectorId != null ? connectorId.hashCode() : 0;
+        result = 31 * result + (name != null ? name.hashCode() : 0);
+        result = 31 * result + (type != null ? type.hashCode() : 0);
+        result = 31 * result + (hidden ? 1 : 0);
+        result = 31 * result + (internal ? 1 : 0);
+        result = 31 * result + Arrays.hashCode(fieldNames);
+        result = 31 * result + Arrays.hashCode(positionIndices);
+        return result;
     }
 
     @Override
     public String toString() {
-        return toStringHelper(this)
-                .add("connectorId", connectorId)
-                .add("name", name)
-                .add("type", type)
-                .add("hidden", hidden)
-                .add("internal", internal)
-                .add("positionIndex", positionIndex)
-                .toString();
+        return "PulsarColumnHandle{" +
+                "connectorId='" + connectorId + '\'' +
+                ", name='" + name + '\'' +
+                ", type=" + type +
+                ", hidden=" + hidden +
+                ", internal=" + internal +
+                ", fieldNames=" + Arrays.toString(fieldNames) +
+                ", positionIndices=" + Arrays.toString(positionIndices) +
+                '}';
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 2e23c87..9a484ba 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -21,19 +21,25 @@ package org.apache.pulsar.sql.presto;
 import com.facebook.presto.spi.ColumnMetadata;
 import com.facebook.presto.spi.type.Type;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class PulsarColumnMetadata extends ColumnMetadata {
 
     private boolean isInternal;
-    private Integer positionIndex;
     // need this because presto ColumnMetadata saves name in lowercase
     private String nameWithCase;
+    private String[] fieldNames;
+    private Integer[] positionIndices;
 
     public PulsarColumnMetadata(String name, Type type, String comment, String extraInfo,
-                                boolean hidden, boolean isInternal, Integer positionIndex) {
+                                boolean hidden, boolean isInternal,
+                                String[] fieldNames, Integer[] positionIndices) {
         super(name, type, comment, extraInfo, hidden);
         this.nameWithCase = name;
         this.isInternal = isInternal;
-        this.positionIndex = positionIndex;
+        this.fieldNames = fieldNames;
+        this.positionIndices = positionIndices;
     }
 
     public String getNameWithCase() {
@@ -44,30 +50,22 @@ public class PulsarColumnMetadata extends ColumnMetadata {
         return isInternal;
     }
 
-    public int getPositionIndex() {
-        return positionIndex;
+    public String[] getFieldNames() {
+        return fieldNames;
     }
 
+    public Integer[] getPositionIndices() {
+        return positionIndices;
+    }
 
     @Override
     public String toString() {
-        StringBuilder sb = new StringBuilder("PulsarColumnMetadata{");
-        sb.append("name='").append(getName()).append('\'');
-        sb.append(", type=").append(getType());
-        if (getComment() != null) {
-            sb.append(", comment='").append(getComment()).append('\'');
-        }
-        if (getExtraInfo() != null) {
-            sb.append(", extraInfo='").append(getExtraInfo()).append('\'');
-        }
-        if (isHidden()) {
-            sb.append(", hidden");
-        }
-        if (isInternal()) {
-            sb.append(", internal");
-        }
-        sb.append('}');
-        return sb.toString();
+        return "PulsarColumnMetadata{" +
+                "isInternal=" + isInternal +
+                ", nameWithCase='" + nameWithCase + '\'' +
+                ", fieldNames=" + Arrays.toString(fieldNames) +
+                ", positionIndices=" + Arrays.toString(positionIndices) +
+                '}';
     }
 
     @Override
@@ -78,13 +76,19 @@ public class PulsarColumnMetadata extends ColumnMetadata {
 
         PulsarColumnMetadata that = (PulsarColumnMetadata) o;
 
-        return isInternal == that.isInternal;
+        if (isInternal != that.isInternal) return false;
+        if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
+        if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+        return Arrays.deepEquals(positionIndices, that.positionIndices);
     }
 
     @Override
     public int hashCode() {
         int result = super.hashCode();
         result = 31 * result + (isInternal ? 1 : 0);
+        result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() : 0);
+        result = 31 * result + Arrays.hashCode(fieldNames);
+        result = 31 * result + Arrays.hashCode(positionIndices);
         return result;
     }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 8341775..76da585 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -179,11 +179,11 @@ public abstract class PulsarInternalColumn {
                 getName(),
                 getType(),
                 hidden,
-                true, null);
+                true, null, null);
     }
 
     PulsarColumnMetadata getColumnMetadata(boolean hidden) {
-        return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null);
+        return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null, null);
     }
 
     public static Set<PulsarInternalColumn> getInternalFields() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 6e30ea5..f283ea2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -32,14 +32,11 @@ import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.SchemaTablePrefix;
 import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.BooleanType;
 import com.facebook.presto.spi.type.DoubleType;
 import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.RealType;
-import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -60,23 +57,23 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import javax.inject.Inject;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.Stack;
 import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.TimeType.TIME;
 import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static java.util.Objects.requireNonNull;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
 
 public class PulsarMetadata implements ConnectorMetadata {
 
@@ -183,7 +180,8 @@ public class PulsarMetadata implements ConnectorMetadata {
                         pulsarColumnMetadata.getType(),
                         pulsarColumnMetadata.isHidden(),
                         pulsarColumnMetadata.isInternal(),
-                        pulsarColumnMetadata.getPositionIndex());
+                        pulsarColumnMetadata.getFieldNames(),
+                        pulsarColumnMetadata.getPositionIndices());
 
                 columnHandles.put(
                         columnMetadata.getName(),
@@ -289,11 +287,8 @@ public class PulsarMetadata implements ConnectorMetadata {
 
         ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
 
-        List<Schema.Field> fields = schema.getFields();
-        for (int i = 0; i < fields.size(); i++) {
-            Schema.Field field = fields.get(i);
-            builder.addAll(getColumns(field.name(), field.schema(), i));
-        }
+        builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
+
         if (withInternalColumns) {
             PulsarInternalColumn.getInternalFields().stream().forEach(new Consumer<PulsarInternalColumn>() {
                 @Override
@@ -306,15 +301,21 @@ public class PulsarMetadata implements ConnectorMetadata {
         return new ConnectorTableMetadata(schemaTableName, builder.build());
     }
 
-    // TODO support nested fields
-    private List<PulsarColumnMetadata> getColumns(String name, Schema fieldSchema, int index) {
+
+    @VisibleForTesting
+    static List<PulsarColumnMetadata> getColumns(String fieldName, Schema fieldSchema,
+                                                  Set<String> fieldTypes,
+                                                  Stack<String> fieldNames,
+                                                  Stack<Integer> positionIndices) {
 
         List<PulsarColumnMetadata> columnMetadataList = new LinkedList<>();
 
         if (isPrimitiveType(fieldSchema.getType())) {
-            columnMetadataList.add(new PulsarColumnMetadata(name,
+            columnMetadataList.add(new PulsarColumnMetadata(fieldName,
                     convertType(fieldSchema.getType(), fieldSchema.getLogicalType()),
-                    null, null, false, false, index));
+                    null, null, false, false,
+                    fieldNames.toArray(new String[fieldNames.size()]),
+                    positionIndices.toArray(new Integer[positionIndices.size()])));
         } else if (fieldSchema.getType() == Schema.Type.UNION) {
             boolean canBeNull = false;
             for (Schema type : fieldSchema.getTypes()) {
@@ -322,22 +323,53 @@ public class PulsarMetadata implements ConnectorMetadata {
                     PulsarColumnMetadata columnMetadata;
                     if (type.getType() != Schema.Type.NULL) {
                         if (!canBeNull) {
-                            columnMetadata = new PulsarColumnMetadata(name,
+                            columnMetadata = new PulsarColumnMetadata(fieldName,
                                     convertType(type.getType(), type.getLogicalType()),
-                                    null, null, false, false, index);
+                                    null, null, false, false,
+                                    fieldNames.toArray(new String[fieldNames.size()]),
+                                    positionIndices.toArray(new Integer[positionIndices.size()]));
                         } else {
-                            columnMetadata = new PulsarColumnMetadata(name,
+                            columnMetadata = new PulsarColumnMetadata(fieldName,
                                     convertType(type.getType(), type.getLogicalType()),
-                                    "field can be null", null, false, false, index);
+                                    "field can be null", null, false, false,
+                                    fieldNames.toArray(new String[fieldNames.size()]),
+                                    positionIndices.toArray(new Integer[positionIndices.size()]));
                         }
                         columnMetadataList.add(columnMetadata);
                     } else {
                         canBeNull = true;
                     }
+                } else {
+                    List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices);
+                    columnMetadataList.addAll(columns);
+
                 }
             }
         } else if (fieldSchema.getType() == Schema.Type.RECORD) {
+            // check if we have seen this type before to prevent cyclic class definitions.
+            if (!fieldTypes.contains(fieldSchema.getFullName())) {
+                // add to types seen so far in traversal
+                fieldTypes.add(fieldSchema.getFullName());
+                List<Schema.Field> fields = fieldSchema.getFields();
+                for (int i = 0; i < fields.size(); i++) {
+                    Schema.Field field = fields.get(i);
+                    fieldNames.push(field.name());
+                    positionIndices.push(i);
+                    List<PulsarColumnMetadata> columns;
+                    if (fieldName == null) {
+                        columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices);
+                    } else {
+                        columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
 
+                    }
+                    positionIndices.pop();
+                    fieldNames.pop();
+                    columnMetadataList.addAll(columns);
+                }
+                fieldTypes.remove(fieldSchema.getFullName());
+            } else {
+                log.debug("Already seen type: %s", fieldSchema.getFullName());
+            }
         } else if (fieldSchema.getType() == Schema.Type.ARRAY) {
 
         } else if (fieldSchema.getType() == Schema.Type.MAP) {
@@ -383,7 +415,8 @@ public class PulsarMetadata implements ConnectorMetadata {
         }
     }
 
-    private boolean isPrimitiveType(Schema.Type type) {
+    @VisibleForTesting
+    static boolean isPrimitiveType(Schema.Type type) {
         return Schema.Type.NULL == type
                 || Schema.Type.BOOLEAN == type
                 || Schema.Type.INT == type
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 8593c55..ef56e6c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.sql.presto;
 
 import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -45,19 +44,14 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import java.io.IOException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.TimeUnit;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
 import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.DateTimeEncoding.unpackMillisUtc;
 import static com.facebook.presto.spi.type.DateType.DATE;
 import static com.facebook.presto.spi.type.IntegerType.INTEGER;
 import static com.facebook.presto.spi.type.RealType.REAL;
@@ -67,7 +61,6 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
 import static com.facebook.presto.spi.type.TinyintType.TINYINT;
 import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 public class PulsarRecordCursor implements RecordCursor {
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 4da7ea5..0882efc 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -59,11 +59,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,7 +80,6 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -111,6 +110,7 @@ public abstract class TestPulsarConnector {
     protected static Map<String, SchemaInfo> topicsToSchemas;
     protected static Map<String, Long> topicsToNumEntries;
 
+
     protected static final NamespaceName NAMESPACE_NAME_1 = NamespaceName.get("tenant-1", "ns-1");
     protected static final NamespaceName NAMESPACE_NAME_2 = NamespaceName.get("tenant-1", "ns-2");
     protected static final NamespaceName NAMESPACE_NAME_3 = NamespaceName.get("tenant-2", "ns-1");
@@ -136,195 +136,440 @@ public abstract class TestPulsarConnector {
     protected static final TopicName PARTITIONED_TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4,
             "partitioned-topic-2");
 
+
     public static class Foo {
-        int field1;
-        String field2;
-        float field3;
-        double field4;
-        boolean field5;
-        long field6;
+        public static class Bar {
+            public int field1;
+        }
+
+        public int field1;
+        public String field2;
+        public float field3;
+        public double field4;
+        public boolean field5;
+        public long field6;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", \"logicalType\": \"timestamp-millis\" }")
-        protected long timestamp;
+        public long timestamp;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"time-millis\" }")
-        protected int time;
+        public int time;
         @org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
-        protected int date;
+        public int date;
+        public TestPulsarConnector.Bar bar;
+    }
 
-        public int getField1() {
-            return field1;
-        }
+    public static class Bar {
+        public Integer field1;
+        public String field2;
+        public Boo test;
+        public float field3;
+        public Boo test2;
+    }
 
-        public void setField1(int field1) {
-            this.field1 = field1;
-        }
+    public static class Boo {
+        public Double field4;
+        public Boolean field5;
+        public long field6;
+        // for test cyclic definitions
+        public Foo foo;
+        public Boo boo;
+        public Bar bar;
+        // different namespace with same classname should work though
+        public Foo.Bar foobar;
+    }
 
-        public String getField2() {
-            return field2;
-        }
+    protected static Map<String, Type> fooTypes;
+    protected static List<PulsarColumnHandle> fooColumnHandles;
+    protected static Map<TopicName, PulsarSplit> splits;
+    protected static Map<String, String[]> fooFieldNames;
+    protected static Map<String, Integer[]> fooPositionIndices;
+    protected static Map<String, Function<Integer, Object>> fooFunctions;
 
-        public void setField2(String field2) {
-            this.field2 = field2;
-        }
+    static {
+        try {
+            topicNames = new LinkedList<>();
+            topicNames.add(TOPIC_1);
+            topicNames.add(TOPIC_2);
+            topicNames.add(TOPIC_3);
+            topicNames.add(TOPIC_4);
+            topicNames.add(TOPIC_5);
+            topicNames.add(TOPIC_6);
+
+            partitionedTopicNames = new LinkedList<>();
+            partitionedTopicNames.add(PARTITIONED_TOPIC_1);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_2);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_3);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_4);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_5);
+            partitionedTopicNames.add(PARTITIONED_TOPIC_6);
+
+            partitionedTopicsToPartitions = new HashMap<>();
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 4);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 5);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 6);
+            partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
+
+            topicsToSchemas = new HashMap<>();
+            topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+
+            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+            fooTypes = new HashMap<>();
+            fooTypes.put("field1", IntegerType.INTEGER);
+            fooTypes.put("field2", VarcharType.VARCHAR);
+            fooTypes.put("field3", RealType.REAL);
+            fooTypes.put("field4", DoubleType.DOUBLE);
+            fooTypes.put("field5", BooleanType.BOOLEAN);
+            fooTypes.put("field6", BigintType.BIGINT);
+            fooTypes.put("timestamp", TIMESTAMP);
+            fooTypes.put("time", TIME);
+            fooTypes.put("date", DATE);
+            fooTypes.put("bar.field1", IntegerType.INTEGER);
+            fooTypes.put("bar.field2", VarcharType.VARCHAR);
+            fooTypes.put("bar.test.field4", DoubleType.DOUBLE);
+            fooTypes.put("bar.test.field5", BooleanType.BOOLEAN);
+            fooTypes.put("bar.test.field6", BigintType.BIGINT);
+            fooTypes.put("bar.test.foobar.field1", IntegerType.INTEGER);
+            fooTypes.put("bar.field3", RealType.REAL);
+            fooTypes.put("bar.test2.field4", DoubleType.DOUBLE);
+            fooTypes.put("bar.test2.field5", BooleanType.BOOLEAN);
+            fooTypes.put("bar.test2.field6", BigintType.BIGINT);
+            fooTypes.put("bar.test2.foobar.field1", IntegerType.INTEGER);
+
+            topicsToNumEntries = new HashMap<>();
+            topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
+            topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
+            topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
+            topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
+            topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
+            topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
+            topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
+
+            fooFieldNames = new HashMap<>();
+            fooPositionIndices = new HashMap<>();
+            fooColumnHandles = new LinkedList<>();
+
+            String[] fieldNames1 = {"field1"};
+            Integer[] positionIndices1 = {0};
+            fooFieldNames.put("field1", fieldNames1);
+            fooPositionIndices.put("field1", positionIndices1);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field1",
+                    fooTypes.get("field1"),
+                    false,
+                    false,
+                    fooFieldNames.get("field1"),
+                    fooPositionIndices.get("field1")));
 
-        public float getField3() {
-            return field3;
-        }
 
-        public void setField3(float field3) {
-            this.field3 = field3;
-        }
+            String[] fieldNames2 = {"field2"};
+            Integer[] positionIndices2 = {1};
+            fooFieldNames.put("field2", fieldNames2);
+            fooPositionIndices.put("field2", positionIndices2);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field2",
+                    fooTypes.get("field2"),
+                    false,
+                    false,
+                    fieldNames2,
+                    positionIndices2));
 
-        public double getField4() {
-            return field4;
-        }
+            String[] fieldNames3 = {"field3"};
+            Integer[] positionIndices3 = {2};
+            fooFieldNames.put("field3", fieldNames3);
+            fooPositionIndices.put("field3", positionIndices3);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field3",
+                    fooTypes.get("field3"),
+                    false,
+                    false,
+                    fieldNames3,
+                    positionIndices3));
 
-        public void setField4(double field4) {
-            this.field4 = field4;
-        }
+            String[] fieldNames4 = {"field4"};
+            Integer[] positionIndices4 = {3};
+            fooFieldNames.put("field4", fieldNames4);
+            fooPositionIndices.put("field4", positionIndices4);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field4",
+                    fooTypes.get("field4"),
+                    false,
+                    false,
+                    fieldNames4,
+                    positionIndices4));
 
-        public boolean isField5() {
-            return field5;
-        }
 
-        public void setField5(boolean field5) {
-            this.field5 = field5;
-        }
+            String[] fieldNames5 = {"field5"};
+            Integer[] positionIndices5 = {4};
+            fooFieldNames.put("field5", fieldNames5);
+            fooPositionIndices.put("field5", positionIndices5);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field5",
+                    fooTypes.get("field5"),
+                    false,
+                    false,
+                    fieldNames5,
+                    positionIndices5));
 
-        public long getField6() {
-            return field6;
-        }
+            String[] fieldNames6 = {"field6"};
+            Integer[] positionIndices6 = {5};
+            fooFieldNames.put("field6", fieldNames6);
+            fooPositionIndices.put("field6", positionIndices6);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "field6",
+                    fooTypes.get("field6"),
+                    false,
+                    false,
+                    fieldNames6,
+                    positionIndices6));
 
-        public void setField6(long field6) {
-            this.field6 = field6;
-        }
+            String[] fieldNames7 = {"timestamp"};
+            Integer[] positionIndices7 = {6};
+            fooFieldNames.put("timestamp", fieldNames7);
+            fooPositionIndices.put("timestamp", positionIndices7);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "timestamp",
+                    fooTypes.get("timestamp"),
+                    false,
+                    false,
+                    fieldNames7,
+                    positionIndices7));
 
-        public long getTimestamp() {
-            return timestamp;
-        }
+            String[] fieldNames8 = {"time"};
+            Integer[] positionIndices8 = {7};
+            fooFieldNames.put("time", fieldNames8);
+            fooPositionIndices.put("time", positionIndices8);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "time",
+                    fooTypes.get("time"),
+                    false,
+                    false,
+                    fieldNames8,
+                    positionIndices8));
 
-        public void setTimestamp(long timestamp) {
-            this.timestamp = timestamp;
-        }
+            String[] fieldNames9 = {"date"};
+            Integer[] positionIndices9 = {8};
+            fooFieldNames.put("date", fieldNames9);
+            fooPositionIndices.put("date", positionIndices9);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "date",
+                    fooTypes.get("date"),
+                    false,
+                    false,
+                    fieldNames9,
+                    positionIndices9));
 
-        public int getTime() {
-            return time;
-        }
+            String[] bar_fieldNames1 = {"bar", "field1"};
+            Integer[] bar_positionIndices1 = {9, 0};
+            fooFieldNames.put("bar.field1", bar_fieldNames1);
+            fooPositionIndices.put("bar.field1", bar_positionIndices1);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field1",
+                    fooTypes.get("bar.field1"),
+                    false,
+                    false,
+                    bar_fieldNames1,
+                    bar_positionIndices1));
 
-        public void setTime(int time) {
-            this.time = time;
-        }
+            String[] bar_fieldNames2 = {"bar", "field2"};
+            Integer[] bar_positionIndices2 = {9, 1};
+            fooFieldNames.put("bar.field2", bar_fieldNames2);
+            fooPositionIndices.put("bar.field2", bar_positionIndices2);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field2",
+                    fooTypes.get("bar.field2"),
+                    false,
+                    false,
+                    bar_fieldNames2,
+                    bar_positionIndices2));
 
-        public int getDate() {
-            return date;
-        }
+            String[] bar_test_fieldNames4 = {"bar", "test", "field4"};
+            Integer[] bar_test_positionIndices4 = {9, 2, 0};
+            fooFieldNames.put("bar.test.field4", bar_test_fieldNames4);
+            fooPositionIndices.put("bar.test.field4", bar_test_positionIndices4);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field4",
+                    fooTypes.get("bar.test.field4"),
+                    false,
+                    false,
+                    bar_test_fieldNames4,
+                    bar_test_positionIndices4));
 
-        public void setDate(int date) {
-            this.date = date;
-        }
-    }
+            String[] bar_test_fieldNames5 = {"bar", "test", "field5"};
+            Integer[] bar_test_positionIndices5 = {9, 2, 1};
+            fooFieldNames.put("bar.test.field5", bar_test_fieldNames5);
+            fooPositionIndices.put("bar.test.field5", bar_test_positionIndices5);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field5",
+                    fooTypes.get("bar.test.field5"),
+                    false,
+                    false,
+                    bar_test_fieldNames5,
+                    bar_test_positionIndices5));
 
-    protected static Map<String, Type> fooTypes;
-    protected static List<PulsarColumnHandle> fooColumnHandles;
-    protected static Map<TopicName, PulsarSplit> splits;
+            String[] bar_test_fieldNames6 = {"bar", "test", "field6"};
+            Integer[] bar_test_positionIndices6 = {9, 2, 2};
+            fooFieldNames.put("bar.test.field6", bar_test_fieldNames6);
+            fooPositionIndices.put("bar.test.field6", bar_test_positionIndices6);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test.field6",
+                    fooTypes.get("bar.test.field6"),
+                    false,
+                    false,
+                    bar_test_fieldNames6,
+                    bar_test_positionIndices6));
 
-    static {
-        topicNames = new LinkedList<>();
-        topicNames.add(TOPIC_1);
-        topicNames.add(TOPIC_2);
-        topicNames.add(TOPIC_3);
-        topicNames.add(TOPIC_4);
-        topicNames.add(TOPIC_5);
-        topicNames.add(TOPIC_6);
-
-        partitionedTopicNames = new LinkedList<>();
-        partitionedTopicNames.add(PARTITIONED_TOPIC_1);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_2);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_3);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_4);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_5);
-        partitionedTopicNames.add(PARTITIONED_TOPIC_6);
-
-        partitionedTopicsToPartitions = new HashMap<>();
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 4);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 5);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 6);
-        partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
-
-        topicsToSchemas = new HashMap<>();
-        topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
-
-        topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-        topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
-        fooTypes = new HashMap<>();
-        fooTypes.put("field1", IntegerType.INTEGER);
-        fooTypes.put("field2", VarcharType.VARCHAR);
-        fooTypes.put("field3", RealType.REAL);
-        fooTypes.put("field4", DoubleType.DOUBLE);
-        fooTypes.put("field5", BooleanType.BOOLEAN);
-        fooTypes.put("field6", BigintType.BIGINT);
-        fooTypes.put("timestamp", TIMESTAMP);
-        fooTypes.put("time", TIME);
-        fooTypes.put("date", DATE);
-
-        topicsToNumEntries = new HashMap<>();
-        topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
-        topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
-        topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
-        topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
-        topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
-        topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
-        topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
-
-        fooColumnHandles = new LinkedList<>();
-        for (int i = 0; i < Foo.class.getDeclaredFields().length; i++) {
-            Field field = Foo.class.getDeclaredFields()[i];
+            String[] bar_test_foobar_fieldNames1 = {"bar", "test", "foobar", "field1"};
+            Integer[] bar_test_foobar_positionIndices1 = {9, 2, 6, 0};
+            fooFieldNames.put("bar.test.foobar.field1", bar_test_foobar_fieldNames1);
+            fooPositionIndices.put("bar.test.foobar.field1", bar_test_foobar_positionIndices1);
             fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
-                    field.getName(),
-                    fooTypes.get(field.getName()),
+                    "bar.test.foobar.field1",
+                    fooTypes.get("bar.test.foobar.field1"),
                     false,
                     false,
-                    i));
-        }
-        fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
-                new Function<PulsarInternalColumn, PulsarColumnHandle>() {
-                    @Override
-                    public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
-                        return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
-                    }
-                }).collect(Collectors.toList()));
-
-        splits = new HashMap<>();
-
-        List<TopicName> allTopics = new LinkedList<>();
-        allTopics.addAll(topicNames);
-        allTopics.addAll(partitionedTopicNames);
-
-        for (TopicName topicName : allTopics) {
-            splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
-                    topicName.getNamespace(), topicName.getLocalName(),
-                    topicsToNumEntries.get(topicName.getSchemaName()),
-                    new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
-                    topicsToSchemas.get(topicName.getSchemaName()).getType(),
-                    0, topicsToNumEntries.get(topicName.getSchemaName()),
-                    0, 0, TupleDomain.all()));
+                    bar_test_foobar_fieldNames1,
+                    bar_test_foobar_positionIndices1));
+
+            String[] bar_field3 = {"bar", "field3"};
+            Integer[] bar_positionIndices3 = {9, 3};
+            fooFieldNames.put("bar.field3", bar_field3);
+            fooPositionIndices.put("bar.field3", bar_positionIndices3);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.field3",
+                    fooTypes.get("bar.field3"),
+                    false,
+                    false,
+                    bar_field3,
+                    bar_positionIndices3));
+
+            String[] bar_test2_fieldNames4 = {"bar", "test2", "field4"};
+            Integer[] bar_test2_positionIndices4 = {9, 4, 0};
+            fooFieldNames.put("bar.test2.field4", bar_test2_fieldNames4);
+            fooPositionIndices.put("bar.test2.field4", bar_test2_positionIndices4);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field4",
+                    fooTypes.get("bar.test2.field4"),
+                    false,
+                    false,
+                    bar_test2_fieldNames4,
+                    bar_test2_positionIndices4));
+
+            String[] bar_test2_fieldNames5 = {"bar", "test2", "field5"};
+            Integer[] bar_test2_positionIndices5 = {9, 4, 1};
+            fooFieldNames.put("bar.test2.field5", bar_test2_fieldNames5);
+            fooPositionIndices.put("bar.test2.field5", bar_test2_positionIndices5);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field5",
+                    fooTypes.get("bar.test2.field5"),
+                    false,
+                    false,
+                    bar_test2_fieldNames5,
+                    bar_test2_positionIndices5));
+
+            String[] bar_test2_fieldNames6 = {"bar", "test2", "field6"};
+            Integer[] bar_test2_positionIndices6 = {9, 4, 2};
+            fooFieldNames.put("bar.test2.field6", bar_test2_fieldNames6);
+            fooPositionIndices.put("bar.test2.field6", bar_test2_positionIndices6);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.field6",
+                    fooTypes.get("bar.test2.field6"),
+                    false,
+                    false,
+                    bar_test2_fieldNames6,
+                    bar_test2_positionIndices6));
+
+            String[] bar_test2_foobar_fieldNames1 = {"bar", "test2", "foobar", "field1"};
+            Integer[] bar_test2_foobar_positionIndices1 = {9, 4, 6, 0};
+            fooFieldNames.put("bar.test2.foobar.field1", bar_test2_foobar_fieldNames1);
+            fooPositionIndices.put("bar.test2.foobar.field1", bar_test2_foobar_positionIndices1);
+            fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+                    "bar.test2.foobar.field1",
+                    fooTypes.get("bar.test2.foobar.field1"),
+                    false,
+                    false,
+                    bar_test2_foobar_fieldNames1,
+                    bar_test2_foobar_positionIndices1));
+
+            fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
+                    new Function<PulsarInternalColumn, PulsarColumnHandle>() {
+                        @Override
+                        public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
+                            return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
+                        }
+                    }).collect(Collectors.toList()));
+
+            splits = new HashMap<>();
+
+            List<TopicName> allTopics = new LinkedList<>();
+            allTopics.addAll(topicNames);
+            allTopics.addAll(partitionedTopicNames);
+
+            for (TopicName topicName : allTopics) {
+                splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
+                        topicName.getNamespace(), topicName.getLocalName(),
+                        topicsToNumEntries.get(topicName.getSchemaName()),
+                        new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
+                        topicsToSchemas.get(topicName.getSchemaName()).getType(),
+                        0, topicsToNumEntries.get(topicName.getSchemaName()),
+                        0, 0, TupleDomain.all()));
+            }
+
+            fooFunctions = new HashMap<>();
+
+            fooFunctions.put("field1", integer -> integer);
+            fooFunctions.put("field2", integer -> String.valueOf(integer));
+            fooFunctions.put("field3", integer -> integer.floatValue());
+            fooFunctions.put("field4", integer -> integer.doubleValue());
+            fooFunctions.put("field5", integer -> integer % 2 == 0);
+            fooFunctions.put("field6", integer -> integer.longValue());
+            fooFunctions.put("timestamp", integer -> System.currentTimeMillis());
+            fooFunctions.put("time", integer -> {
+                LocalTime now = LocalTime.now(ZoneId.systemDefault());
+                return now.toSecondOfDay() * 1000;
+            });
+            fooFunctions.put("date", integer -> {
+                LocalDate localDate = LocalDate.now();
+                LocalDate epoch = LocalDate.ofEpochDay(0);
+                return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+            });
+            fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1);
+            fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2));
+            fooFunctions.put("bar.field3", integer -> integer + 3.0f);
+
+            fooFunctions.put("bar.test.field4", integer -> integer + 1.0);
+            fooFunctions.put("bar.test.field5", integer -> (integer + 1) % 2 == 0);
+            fooFunctions.put("bar.test.field6", integer -> integer + 10L);
+            fooFunctions.put("bar.test.foobar.field1", integer -> integer % 3);
+
+            fooFunctions.put("bar.test2.field4", integer -> integer + 2.0);
+            fooFunctions.put("bar.test2.field5", integer -> (integer + 1) % 32 == 0);
+            fooFunctions.put("bar.test2.field6", integer -> integer + 15L);
+            fooFunctions.put("bar.test2.foobar.field1", integer -> integer % 3);
+
+
+        } catch (Throwable e) {
+            System.out.println("Error: " + e);
+            System.out.println("Stacktrace: " + Arrays.asList(e.getStackTrace()));
+            throw e;
         }
     }
 
@@ -550,21 +795,45 @@ public abstract class TestPulsarConnector {
                         List<Entry> entries = new LinkedList<>();
                         for (int i = 0; i < readEntries; i++) {
 
+                            Foo.Bar foobar = new Foo.Bar();
+                            foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+                            Boo boo1 = new Boo();
+                            boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count);
+                            boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count);
+                            boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count);
+                            boo1.foo = new Foo();
+                            boo1.boo = null;
+                            boo1.bar = new Bar();
+                            boo1.foobar = foobar;
+
+                            Boo boo2 = new Boo();
+                            boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count);
+                            boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count);
+                            boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count);
+                            boo2.foo = new Foo();
+                            boo2.boo = boo1;
+                            boo2.bar = new Bar();
+                            boo2.foobar = foobar;
+
+                            TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar();
+                            bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count);
+                            bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count);
+                            bar.field3 = (float) fooFunctions.get("bar.field3").apply(count);
+                            bar.test = boo1;
+                            bar.test2 = count % 2 == 0 ? null : boo2;
+
                             Foo foo = new Foo();
-                            foo.field1 = count;
-                            foo.field2 = String.valueOf(count);
-                            foo.field3 = count;
-                            foo.field4 = count;
-                            foo.field5 = count % 2 == 0;
-                            foo.field6 = count;
-                            foo.timestamp = System.currentTimeMillis();
-
-                            LocalTime now = LocalTime.now(ZoneId.systemDefault());
-                            foo.time = now.toSecondOfDay() * 1000;
-
-                            LocalDate localDate = LocalDate.now();
-                            LocalDate epoch = LocalDate.ofEpochDay(0);
-                            foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+                            foo.field1 = (int) fooFunctions.get("field1").apply(count);
+                            foo.field2 = (String) fooFunctions.get("field2").apply(count);
+                            foo.field3 = (float) fooFunctions.get("field3").apply(count);
+                            foo.field4 = (double) fooFunctions.get("field4").apply(count);
+                            foo.field5 = (boolean) fooFunctions.get("field5").apply(count);
+                            foo.field6 = (long) fooFunctions.get("field6").apply(count);
+                            foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
+                            foo.time = (int) fooFunctions.get("time").apply(count);
+                            foo.date = (int) fooFunctions.get("date").apply(count);
+                            foo.bar = bar;
 
                             PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
                                     .setProducerName("test-producer").setSequenceId(positions.get(topic))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 7726763..17ce416 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.shade.javax.ws.rs.core.Response;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -85,16 +84,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     }
 
     @Test
-    public void testGetTableLayouts() {
-
-    }
-
-    @Test
-    public void testGetTableLayout() {
-
-    }
-
-    @Test
     public void testGetTableMetadata() {
 
         List<TopicName> allTopics = new LinkedList<>();
@@ -116,12 +105,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
             Assert.assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
 
             Assert.assertEquals(tableMetadata.getColumns().size(),
-                    Foo.class.getDeclaredFields().length + PulsarInternalColumn.getInternalFields().size());
+                    fooColumnHandles.size());
 
-            List<String> fieldNames = new LinkedList<>();
-            for (Field field : Foo.class.getDeclaredFields()) {
-                fieldNames.add(field.getName());
-            }
+            List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
             for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
                 fieldNames.add(internalField.getName());
@@ -278,10 +264,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         Map<String, ColumnHandle> columnHandleMap
                 = new HashMap<>(this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle));
 
-        List<String> fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -298,9 +281,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
                 Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
                         .getSchema()));
                 Assert.assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
-                Assert.assertEquals(pulsarColumnHandle.getName(), schema.getField(field).name());
-                Assert.assertNotNull(pulsarColumnHandle.getPositionIndex());
-                Assert.assertEquals(pulsarColumnHandle.getPositionIndex().intValue(), schema.getField(field).pos());
+                Assert.assertEquals(pulsarColumnHandle.getName(), field);
+                Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
+                Assert.assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
                 Assert.assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
                 Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
             }
@@ -319,13 +302,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         List<ColumnMetadata> columnMetadataList
                 = tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        List<String> fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -345,13 +325,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
@@ -377,13 +354,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         Assert.assertEquals(tableColumnsMap.size(), 1);
         columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
         Assert.assertNotNull(columnMetadataList);
-        Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
-                PulsarInternalColumn.getInternalFields().size());
+        Assert.assertEquals(columnMetadataList.size(),
+                fooColumnHandles.size());
 
-        fieldNames = new LinkedList<>();
-        for (Field field : Foo.class.getDeclaredFields()) {
-            fieldNames.add(field.getName());
-        }
+        fieldNames = new LinkedList<>(fooFieldNames.keySet());
 
         for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
             fieldNames.add(internalField.getName());
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 74126a6..1b323a2 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,16 +19,14 @@
 package org.apache.pulsar.sql.presto;
 
 import io.airlift.log.Logger;
-import io.airlift.slice.Slice;
 import org.apache.pulsar.common.naming.TopicName;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
-import static com.facebook.presto.spi.type.BigintType.BIGINT;
-
 @Test(singleThreaded = true)
 public class TestPulsarRecordCursor extends TestPulsarConnector {
 
@@ -44,31 +42,81 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
             PulsarRecordCursor pulsarRecordCursor = entry.getValue();
             TopicName topicName = entry.getKey();
 
-            long count = 0L;
+            int count = 0;
             while (pulsarRecordCursor.advanceNextPosition()) {
+                List<String> columnsSeen = new LinkedList<>();
                 for (int i = 0; i < fooColumnHandles.size(); i++) {
-
                     if (pulsarRecordCursor.isNull(i)) {
-
+                        columnsSeen.add(fooColumnHandles.get(i).getName());
                     } else {
-                        if (fooColumnHandles.get(i).getType().getJavaType() == long.class) {
-                            if (fooColumnHandles.get(i).getType() == BIGINT) {
-                                Assert.assertEquals(pulsarRecordCursor.getLong(i), count);
-                            }
-                        } else if (fooColumnHandles.get(i).getType().getJavaType() == boolean.class) {
-                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), count % 2 == 0);
-                        } else if (fooColumnHandles.get(i).getType().getJavaType() == double.class) {
-                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), Long.valueOf(count).doubleValue());
-                        } else if (fooColumnHandles.get(i).getType().getJavaType() == Slice.class) {
-                            if (!fooColumnHandles.get(i).isInternal()) {
-                                Assert.assertEquals(pulsarRecordCursor.getSlice(i).toStringUtf8().getBytes(),
-                                        Charset.forName("UTF-8").encode(String.valueOf(count)).array());
-                            }
+                        if (fooColumnHandles.get(i).getName().equals("field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("field2")) {
+                            Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("field3")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("field4")) {
+                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("field5")) {
+                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("timestamp")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("time")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("date")) {
+                            pulsarRecordCursor.getLong(i);
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.field2")) {
+                            Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.field3")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
+                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
+                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
+                            Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
+                            Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
+                        } else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
+                            Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
+                            columnsSeen.add(fooColumnHandles.get(i).getName());
                         } else {
-                            Assert.fail("Unknown type: " + fooColumnHandles.get(i).getType().getJavaType());
+                            if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
+                                columnsSeen.add(fooColumnHandles.get(i).getName());
+                            }
                         }
                     }
                 }
+                Assert.assertEquals(columnsSeen.size(), fooColumnHandles.size());
                 count++;
             }
             Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());