You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/09/07 19:27:35 UTC
[incubator-pulsar] branch master updated: support nested fields in
Pulsar presto connector (#2515)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2023b48 support nested fields in Pulsar presto connector (#2515)
2023b48 is described below
commit 2023b489ade528e622c0f203e5da7d377ec2772a
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Sep 7 12:27:33 2018 -0700
support nested fields in Pulsar presto connector (#2515)
* support nested fields in Pulsar presto connector
* cleaning up
* removing debug messages
---
.../org/apache/pulsar/io/twitter/TweetData.java | 7 -
.../pulsar/sql/presto/AvroSchemaHandler.java | 18 +-
.../pulsar/sql/presto/JSONSchemaHandler.java | 14 +-
.../pulsar/sql/presto/PulsarColumnHandle.java | 81 +--
.../pulsar/sql/presto/PulsarColumnMetadata.java | 50 +-
.../pulsar/sql/presto/PulsarInternalColumn.java | 4 +-
.../apache/pulsar/sql/presto/PulsarMetadata.java | 77 ++-
.../pulsar/sql/presto/PulsarRecordCursor.java | 7 -
.../pulsar/sql/presto/TestPulsarConnector.java | 625 +++++++++++++++------
.../pulsar/sql/presto/TestPulsarMetadata.java | 56 +-
.../pulsar/sql/presto/TestPulsarRecordCursor.java | 90 ++-
11 files changed, 689 insertions(+), 340 deletions(-)
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
index 36d4dc8..e5cb79c 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
@@ -82,13 +82,6 @@ public class TweetData {
private Boolean defaultProfileImage;
}
@Data
- public static class Url {
- private String url;
- private String expandedUrl;
- private String displayUrl;
- private List<Long> indices = null;
- }
- @Data
public static class RetweetedStatus {
private String createdAt;
private Long id;
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
index f8cec40..402a36e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java
@@ -55,9 +55,23 @@ public class AvroSchemaHandler implements SchemaHandler {
public Object extractField(int index, Object currentRecord) {
try {
GenericRecord record = (GenericRecord) currentRecord;
- return record.get(this.columnHandles.get(index).getPositionIndex());
+ PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
+ Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
+ Object curr = record.get(positionIndices[0]);
+ if (curr == null) {
+ return null;
+ }
+ if (positionIndices.length > 0) {
+ for (int i = 1 ; i < positionIndices.length; i++) {
+ curr = ((GenericRecord) curr).get(positionIndices[i]);
+ if (curr == null) {
+ return null;
+ }
+ }
+ }
+ return curr;
} catch (Exception ex) {
- log.error(ex);
+ log.debug(ex,"%s", ex);
}
return null;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
index f72ad0e..aed7e0a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
+import java.util.Arrays;
import java.util.List;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
@@ -54,10 +55,19 @@ public class JSONSchemaHandler implements SchemaHandler {
try {
JsonObject jsonObject = (JsonObject) currentRecord;
PulsarColumnHandle pulsarColumnHandle = columnHandles.get(index);
- JsonElement field = jsonObject.get(pulsarColumnHandle.getName());
+
+ String[] fieldNames = pulsarColumnHandle.getFieldNames();
+ JsonElement field = jsonObject.get(fieldNames[0]);
if (field.isJsonNull()) {
return null;
}
+ for (int i = 1; i < fieldNames.length ; i++) {
+ field = field.getAsJsonObject().get(fieldNames[i]);
+ if (field.isJsonNull()) {
+ return null;
+ }
+ }
+
Type type = pulsarColumnHandle.getType();
Class<?> javaType = type.getJavaType();
@@ -81,7 +91,7 @@ public class JSONSchemaHandler implements SchemaHandler {
return null;
}
} catch (Exception ex) {
- log.error(ex);
+ log.debug(ex,"%s", ex);
}
return null;
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
index def8cc3..f98e864 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java
@@ -24,9 +24,8 @@ import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.Objects;
+import java.util.Arrays;
-import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
public class PulsarColumnHandle implements ColumnHandle {
@@ -53,10 +52,9 @@ public class PulsarColumnHandle implements ColumnHandle {
*/
private final boolean internal;
- /**
- * The index of the field in the schema associated with this column.
- */
- private Integer positionIndex;
+ private final String[] fieldNames;
+
+ private final Integer[] positionIndices;
@JsonCreator
public PulsarColumnHandle(
@@ -65,13 +63,15 @@ public class PulsarColumnHandle implements ColumnHandle {
@JsonProperty("type") Type type,
@JsonProperty("hidden") boolean hidden,
@JsonProperty("internal") boolean internal,
- @JsonProperty("positionIndex") Integer positionIndex) {
+ @JsonProperty("fieldNames") String[] fieldNames,
+ @JsonProperty("positionIndices") Integer[] positionIndices) {
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.name = requireNonNull(name, "name is null");
this.type = requireNonNull(type, "type is null");
this.hidden = hidden;
this.internal = internal;
- this.positionIndex = positionIndex;
+ this.fieldNames = fieldNames;
+ this.positionIndices = positionIndices;
}
@JsonProperty
@@ -100,8 +100,13 @@ public class PulsarColumnHandle implements ColumnHandle {
}
@JsonProperty
- public Integer getPositionIndex() {
- return positionIndex;
+ public String[] getFieldNames() {
+ return fieldNames;
+ }
+
+ @JsonProperty
+ public Integer[] getPositionIndices() {
+ return positionIndices;
}
@@ -110,37 +115,43 @@ public class PulsarColumnHandle implements ColumnHandle {
}
@Override
- public int hashCode() {
- return Objects.hash(connectorId, name, type, hidden, internal);
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PulsarColumnHandle that = (PulsarColumnHandle) o;
+
+ if (hidden != that.hidden) return false;
+ if (internal != that.internal) return false;
+ if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (type != null ? !type.equals(that.type) : that.type != null) return false;
+ if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+ return Arrays.deepEquals(positionIndices, that.positionIndices);
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
-
- PulsarColumnHandle other = (PulsarColumnHandle) obj;
- return Objects.equals(this.connectorId, other.connectorId) &&
- Objects.equals(this.name, other.name) &&
- Objects.equals(this.type, other.type) &&
- Objects.equals(this.hidden, other.hidden) &&
- Objects.equals(this.internal, other.internal) &&
- Objects.equals(this.positionIndex, other.positionIndex);
+ public int hashCode() {
+ int result = connectorId != null ? connectorId.hashCode() : 0;
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ result = 31 * result + (type != null ? type.hashCode() : 0);
+ result = 31 * result + (hidden ? 1 : 0);
+ result = 31 * result + (internal ? 1 : 0);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ result = 31 * result + Arrays.hashCode(positionIndices);
+ return result;
}
@Override
public String toString() {
- return toStringHelper(this)
- .add("connectorId", connectorId)
- .add("name", name)
- .add("type", type)
- .add("hidden", hidden)
- .add("internal", internal)
- .add("positionIndex", positionIndex)
- .toString();
+ return "PulsarColumnHandle{" +
+ "connectorId='" + connectorId + '\'' +
+ ", name='" + name + '\'' +
+ ", type=" + type +
+ ", hidden=" + hidden +
+ ", internal=" + internal +
+ ", fieldNames=" + Arrays.toString(fieldNames) +
+ ", positionIndices=" + Arrays.toString(positionIndices) +
+ '}';
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
index 2e23c87..9a484ba 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java
@@ -21,19 +21,25 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
+import java.util.Arrays;
+import java.util.List;
+
public class PulsarColumnMetadata extends ColumnMetadata {
private boolean isInternal;
- private Integer positionIndex;
// need this because presto ColumnMetadata saves name in lowercase
private String nameWithCase;
+ private String[] fieldNames;
+ private Integer[] positionIndices;
public PulsarColumnMetadata(String name, Type type, String comment, String extraInfo,
- boolean hidden, boolean isInternal, Integer positionIndex) {
+ boolean hidden, boolean isInternal,
+ String[] fieldNames, Integer[] positionIndices) {
super(name, type, comment, extraInfo, hidden);
this.nameWithCase = name;
this.isInternal = isInternal;
- this.positionIndex = positionIndex;
+ this.fieldNames = fieldNames;
+ this.positionIndices = positionIndices;
}
public String getNameWithCase() {
@@ -44,30 +50,22 @@ public class PulsarColumnMetadata extends ColumnMetadata {
return isInternal;
}
- public int getPositionIndex() {
- return positionIndex;
+ public String[] getFieldNames() {
+ return fieldNames;
}
+ public Integer[] getPositionIndices() {
+ return positionIndices;
+ }
@Override
public String toString() {
- StringBuilder sb = new StringBuilder("PulsarColumnMetadata{");
- sb.append("name='").append(getName()).append('\'');
- sb.append(", type=").append(getType());
- if (getComment() != null) {
- sb.append(", comment='").append(getComment()).append('\'');
- }
- if (getExtraInfo() != null) {
- sb.append(", extraInfo='").append(getExtraInfo()).append('\'');
- }
- if (isHidden()) {
- sb.append(", hidden");
- }
- if (isInternal()) {
- sb.append(", internal");
- }
- sb.append('}');
- return sb.toString();
+ return "PulsarColumnMetadata{" +
+ "isInternal=" + isInternal +
+ ", nameWithCase='" + nameWithCase + '\'' +
+ ", fieldNames=" + Arrays.toString(fieldNames) +
+ ", positionIndices=" + Arrays.toString(positionIndices) +
+ '}';
}
@Override
@@ -78,13 +76,19 @@ public class PulsarColumnMetadata extends ColumnMetadata {
PulsarColumnMetadata that = (PulsarColumnMetadata) o;
- return isInternal == that.isInternal;
+ if (isInternal != that.isInternal) return false;
+ if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
+ if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
+ return Arrays.deepEquals(positionIndices, that.positionIndices);
}
@Override
public int hashCode() {
int result = super.hashCode();
result = 31 * result + (isInternal ? 1 : 0);
+ result = 31 * result + (nameWithCase != null ? nameWithCase.hashCode() : 0);
+ result = 31 * result + Arrays.hashCode(fieldNames);
+ result = 31 * result + Arrays.hashCode(positionIndices);
return result;
}
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
index 8341775..76da585 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java
@@ -179,11 +179,11 @@ public abstract class PulsarInternalColumn {
getName(),
getType(),
hidden,
- true, null);
+ true, null, null);
}
PulsarColumnMetadata getColumnMetadata(boolean hidden) {
- return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null);
+ return new PulsarColumnMetadata(name, type, comment, null, hidden, true, null, null);
}
public static Set<PulsarInternalColumn> getInternalFields() {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 6e30ea5..f283ea2 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -32,14 +32,11 @@ import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.predicate.Domain;
-import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.RealType;
-import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
@@ -60,23 +57,23 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import javax.inject.Inject;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.Stack;
import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
-import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static java.util.Objects.requireNonNull;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
+import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
public class PulsarMetadata implements ConnectorMetadata {
@@ -183,7 +180,8 @@ public class PulsarMetadata implements ConnectorMetadata {
pulsarColumnMetadata.getType(),
pulsarColumnMetadata.isHidden(),
pulsarColumnMetadata.isInternal(),
- pulsarColumnMetadata.getPositionIndex());
+ pulsarColumnMetadata.getFieldNames(),
+ pulsarColumnMetadata.getPositionIndices());
columnHandles.put(
columnMetadata.getName(),
@@ -289,11 +287,8 @@ public class PulsarMetadata implements ConnectorMetadata {
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
- List<Schema.Field> fields = schema.getFields();
- for (int i = 0; i < fields.size(); i++) {
- Schema.Field field = fields.get(i);
- builder.addAll(getColumns(field.name(), field.schema(), i));
- }
+ builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
+
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields().stream().forEach(new Consumer<PulsarInternalColumn>() {
@Override
@@ -306,15 +301,21 @@ public class PulsarMetadata implements ConnectorMetadata {
return new ConnectorTableMetadata(schemaTableName, builder.build());
}
- // TODO support nested fields
- private List<PulsarColumnMetadata> getColumns(String name, Schema fieldSchema, int index) {
+
+ @VisibleForTesting
+ static List<PulsarColumnMetadata> getColumns(String fieldName, Schema fieldSchema,
+ Set<String> fieldTypes,
+ Stack<String> fieldNames,
+ Stack<Integer> positionIndices) {
List<PulsarColumnMetadata> columnMetadataList = new LinkedList<>();
if (isPrimitiveType(fieldSchema.getType())) {
- columnMetadataList.add(new PulsarColumnMetadata(name,
+ columnMetadataList.add(new PulsarColumnMetadata(fieldName,
convertType(fieldSchema.getType(), fieldSchema.getLogicalType()),
- null, null, false, false, index));
+ null, null, false, false,
+ fieldNames.toArray(new String[fieldNames.size()]),
+ positionIndices.toArray(new Integer[positionIndices.size()])));
} else if (fieldSchema.getType() == Schema.Type.UNION) {
boolean canBeNull = false;
for (Schema type : fieldSchema.getTypes()) {
@@ -322,22 +323,53 @@ public class PulsarMetadata implements ConnectorMetadata {
PulsarColumnMetadata columnMetadata;
if (type.getType() != Schema.Type.NULL) {
if (!canBeNull) {
- columnMetadata = new PulsarColumnMetadata(name,
+ columnMetadata = new PulsarColumnMetadata(fieldName,
convertType(type.getType(), type.getLogicalType()),
- null, null, false, false, index);
+ null, null, false, false,
+ fieldNames.toArray(new String[fieldNames.size()]),
+ positionIndices.toArray(new Integer[positionIndices.size()]));
} else {
- columnMetadata = new PulsarColumnMetadata(name,
+ columnMetadata = new PulsarColumnMetadata(fieldName,
convertType(type.getType(), type.getLogicalType()),
- "field can be null", null, false, false, index);
+ "field can be null", null, false, false,
+ fieldNames.toArray(new String[fieldNames.size()]),
+ positionIndices.toArray(new Integer[positionIndices.size()]));
}
columnMetadataList.add(columnMetadata);
} else {
canBeNull = true;
}
+ } else {
+ List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices);
+ columnMetadataList.addAll(columns);
+
}
}
} else if (fieldSchema.getType() == Schema.Type.RECORD) {
+ // check if we have seen this type before to prevent cyclic class definitions.
+ if (!fieldTypes.contains(fieldSchema.getFullName())) {
+ // add to types seen so far in traversal
+ fieldTypes.add(fieldSchema.getFullName());
+ List<Schema.Field> fields = fieldSchema.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ Schema.Field field = fields.get(i);
+ fieldNames.push(field.name());
+ positionIndices.push(i);
+ List<PulsarColumnMetadata> columns;
+ if (fieldName == null) {
+ columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices);
+ } else {
+ columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
+ }
+ positionIndices.pop();
+ fieldNames.pop();
+ columnMetadataList.addAll(columns);
+ }
+ fieldTypes.remove(fieldSchema.getFullName());
+ } else {
+ log.debug("Already seen type: %s", fieldSchema.getFullName());
+ }
} else if (fieldSchema.getType() == Schema.Type.ARRAY) {
} else if (fieldSchema.getType() == Schema.Type.MAP) {
@@ -383,7 +415,8 @@ public class PulsarMetadata implements ConnectorMetadata {
}
}
- private boolean isPrimitiveType(Schema.Type type) {
+ @VisibleForTesting
+ static boolean isPrimitiveType(Schema.Type type) {
return Schema.Type.NULL == type
|| Schema.Type.BOOLEAN == type
|| Schema.Type.INT == type
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 8593c55..ef56e6c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
-import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
@@ -45,19 +44,14 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import java.io.IOException;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.TimeUnit;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.DateTimeEncoding.packDateTimeWithZone;
-import static com.facebook.presto.spi.type.DateTimeEncoding.unpackMillisUtc;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.IntegerType.INTEGER;
import static com.facebook.presto.spi.type.RealType.REAL;
@@ -67,7 +61,6 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static com.facebook.presto.spi.type.TinyintType.TINYINT;
import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class PulsarRecordCursor implements RecordCursor {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 4da7ea5..0882efc 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -59,11 +59,11 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -80,7 +80,6 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -111,6 +110,7 @@ public abstract class TestPulsarConnector {
protected static Map<String, SchemaInfo> topicsToSchemas;
protected static Map<String, Long> topicsToNumEntries;
+
protected static final NamespaceName NAMESPACE_NAME_1 = NamespaceName.get("tenant-1", "ns-1");
protected static final NamespaceName NAMESPACE_NAME_2 = NamespaceName.get("tenant-1", "ns-2");
protected static final NamespaceName NAMESPACE_NAME_3 = NamespaceName.get("tenant-2", "ns-1");
@@ -136,195 +136,440 @@ public abstract class TestPulsarConnector {
protected static final TopicName PARTITIONED_TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4,
"partitioned-topic-2");
+
public static class Foo {
- int field1;
- String field2;
- float field3;
- double field4;
- boolean field5;
- long field6;
+ public static class Bar {
+ public int field1;
+ }
+
+ public int field1;
+ public String field2;
+ public float field3;
+ public double field4;
+ public boolean field5;
+ public long field6;
@org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", \"logicalType\": \"timestamp-millis\" }")
- protected long timestamp;
+ public long timestamp;
@org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"time-millis\" }")
- protected int time;
+ public int time;
@org.apache.avro.reflect.AvroSchema("{ \"type\": \"int\", \"logicalType\": \"date\" }")
- protected int date;
+ public int date;
+ public TestPulsarConnector.Bar bar;
+ }
- public int getField1() {
- return field1;
- }
+ public static class Bar {
+ public Integer field1;
+ public String field2;
+ public Boo test;
+ public float field3;
+ public Boo test2;
+ }
- public void setField1(int field1) {
- this.field1 = field1;
- }
+ public static class Boo {
+ public Double field4;
+ public Boolean field5;
+ public long field6;
+ // for test cyclic definitions
+ public Foo foo;
+ public Boo boo;
+ public Bar bar;
+ // different namespace with same classname should work though
+ public Foo.Bar foobar;
+ }
- public String getField2() {
- return field2;
- }
+ protected static Map<String, Type> fooTypes;
+ protected static List<PulsarColumnHandle> fooColumnHandles;
+ protected static Map<TopicName, PulsarSplit> splits;
+ protected static Map<String, String[]> fooFieldNames;
+ protected static Map<String, Integer[]> fooPositionIndices;
+ protected static Map<String, Function<Integer, Object>> fooFunctions;
- public void setField2(String field2) {
- this.field2 = field2;
- }
+ static {
+ try {
+ topicNames = new LinkedList<>();
+ topicNames.add(TOPIC_1);
+ topicNames.add(TOPIC_2);
+ topicNames.add(TOPIC_3);
+ topicNames.add(TOPIC_4);
+ topicNames.add(TOPIC_5);
+ topicNames.add(TOPIC_6);
+
+ partitionedTopicNames = new LinkedList<>();
+ partitionedTopicNames.add(PARTITIONED_TOPIC_1);
+ partitionedTopicNames.add(PARTITIONED_TOPIC_2);
+ partitionedTopicNames.add(PARTITIONED_TOPIC_3);
+ partitionedTopicNames.add(PARTITIONED_TOPIC_4);
+ partitionedTopicNames.add(PARTITIONED_TOPIC_5);
+ partitionedTopicNames.add(PARTITIONED_TOPIC_6);
+
+ partitionedTopicsToPartitions = new HashMap<>();
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 4);
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 5);
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 6);
+ partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
+
+ topicsToSchemas = new HashMap<>();
+ topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+
+ topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+ topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+ topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+
+ fooTypes = new HashMap<>();
+ fooTypes.put("field1", IntegerType.INTEGER);
+ fooTypes.put("field2", VarcharType.VARCHAR);
+ fooTypes.put("field3", RealType.REAL);
+ fooTypes.put("field4", DoubleType.DOUBLE);
+ fooTypes.put("field5", BooleanType.BOOLEAN);
+ fooTypes.put("field6", BigintType.BIGINT);
+ fooTypes.put("timestamp", TIMESTAMP);
+ fooTypes.put("time", TIME);
+ fooTypes.put("date", DATE);
+ fooTypes.put("bar.field1", IntegerType.INTEGER);
+ fooTypes.put("bar.field2", VarcharType.VARCHAR);
+ fooTypes.put("bar.test.field4", DoubleType.DOUBLE);
+ fooTypes.put("bar.test.field5", BooleanType.BOOLEAN);
+ fooTypes.put("bar.test.field6", BigintType.BIGINT);
+ fooTypes.put("bar.test.foobar.field1", IntegerType.INTEGER);
+ fooTypes.put("bar.field3", RealType.REAL);
+ fooTypes.put("bar.test2.field4", DoubleType.DOUBLE);
+ fooTypes.put("bar.test2.field5", BooleanType.BOOLEAN);
+ fooTypes.put("bar.test2.field6", BigintType.BIGINT);
+ fooTypes.put("bar.test2.foobar.field1", IntegerType.INTEGER);
+
+ topicsToNumEntries = new HashMap<>();
+ topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
+ topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
+ topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
+ topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
+ topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
+ topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
+ topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
+
+ fooFieldNames = new HashMap<>();
+ fooPositionIndices = new HashMap<>();
+ fooColumnHandles = new LinkedList<>();
+
+ String[] fieldNames1 = {"field1"};
+ Integer[] positionIndices1 = {0};
+ fooFieldNames.put("field1", fieldNames1);
+ fooPositionIndices.put("field1", positionIndices1);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field1",
+ fooTypes.get("field1"),
+ false,
+ false,
+ fooFieldNames.get("field1"),
+ fooPositionIndices.get("field1")));
- public float getField3() {
- return field3;
- }
- public void setField3(float field3) {
- this.field3 = field3;
- }
+ String[] fieldNames2 = {"field2"};
+ Integer[] positionIndices2 = {1};
+ fooFieldNames.put("field2", fieldNames2);
+ fooPositionIndices.put("field2", positionIndices2);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field2",
+ fooTypes.get("field2"),
+ false,
+ false,
+ fieldNames2,
+ positionIndices2));
- public double getField4() {
- return field4;
- }
+ String[] fieldNames3 = {"field3"};
+ Integer[] positionIndices3 = {2};
+ fooFieldNames.put("field3", fieldNames3);
+ fooPositionIndices.put("field3", positionIndices3);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field3",
+ fooTypes.get("field3"),
+ false,
+ false,
+ fieldNames3,
+ positionIndices3));
- public void setField4(double field4) {
- this.field4 = field4;
- }
+ String[] fieldNames4 = {"field4"};
+ Integer[] positionIndices4 = {3};
+ fooFieldNames.put("field4", fieldNames4);
+ fooPositionIndices.put("field4", positionIndices4);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field4",
+ fooTypes.get("field4"),
+ false,
+ false,
+ fieldNames4,
+ positionIndices4));
- public boolean isField5() {
- return field5;
- }
- public void setField5(boolean field5) {
- this.field5 = field5;
- }
+ String[] fieldNames5 = {"field5"};
+ Integer[] positionIndices5 = {4};
+ fooFieldNames.put("field5", fieldNames5);
+ fooPositionIndices.put("field5", positionIndices5);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field5",
+ fooTypes.get("field5"),
+ false,
+ false,
+ fieldNames5,
+ positionIndices5));
- public long getField6() {
- return field6;
- }
+ String[] fieldNames6 = {"field6"};
+ Integer[] positionIndices6 = {5};
+ fooFieldNames.put("field6", fieldNames6);
+ fooPositionIndices.put("field6", positionIndices6);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "field6",
+ fooTypes.get("field6"),
+ false,
+ false,
+ fieldNames6,
+ positionIndices6));
- public void setField6(long field6) {
- this.field6 = field6;
- }
+ String[] fieldNames7 = {"timestamp"};
+ Integer[] positionIndices7 = {6};
+ fooFieldNames.put("timestamp", fieldNames7);
+ fooPositionIndices.put("timestamp", positionIndices7);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "timestamp",
+ fooTypes.get("timestamp"),
+ false,
+ false,
+ fieldNames7,
+ positionIndices7));
- public long getTimestamp() {
- return timestamp;
- }
+ String[] fieldNames8 = {"time"};
+ Integer[] positionIndices8 = {7};
+ fooFieldNames.put("time", fieldNames8);
+ fooPositionIndices.put("time", positionIndices8);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "time",
+ fooTypes.get("time"),
+ false,
+ false,
+ fieldNames8,
+ positionIndices8));
- public void setTimestamp(long timestamp) {
- this.timestamp = timestamp;
- }
+ String[] fieldNames9 = {"date"};
+ Integer[] positionIndices9 = {8};
+ fooFieldNames.put("date", fieldNames9);
+ fooPositionIndices.put("date", positionIndices9);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "date",
+ fooTypes.get("date"),
+ false,
+ false,
+ fieldNames9,
+ positionIndices9));
- public int getTime() {
- return time;
- }
+ String[] bar_fieldNames1 = {"bar", "field1"};
+ Integer[] bar_positionIndices1 = {9, 0};
+ fooFieldNames.put("bar.field1", bar_fieldNames1);
+ fooPositionIndices.put("bar.field1", bar_positionIndices1);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.field1",
+ fooTypes.get("bar.field1"),
+ false,
+ false,
+ bar_fieldNames1,
+ bar_positionIndices1));
- public void setTime(int time) {
- this.time = time;
- }
+ String[] bar_fieldNames2 = {"bar", "field2"};
+ Integer[] bar_positionIndices2 = {9, 1};
+ fooFieldNames.put("bar.field2", bar_fieldNames2);
+ fooPositionIndices.put("bar.field2", bar_positionIndices2);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.field2",
+ fooTypes.get("bar.field2"),
+ false,
+ false,
+ bar_fieldNames2,
+ bar_positionIndices2));
- public int getDate() {
- return date;
- }
+ String[] bar_test_fieldNames4 = {"bar", "test", "field4"};
+ Integer[] bar_test_positionIndices4 = {9, 2, 0};
+ fooFieldNames.put("bar.test.field4", bar_test_fieldNames4);
+ fooPositionIndices.put("bar.test.field4", bar_test_positionIndices4);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test.field4",
+ fooTypes.get("bar.test.field4"),
+ false,
+ false,
+ bar_test_fieldNames4,
+ bar_test_positionIndices4));
- public void setDate(int date) {
- this.date = date;
- }
- }
+ String[] bar_test_fieldNames5 = {"bar", "test", "field5"};
+ Integer[] bar_test_positionIndices5 = {9, 2, 1};
+ fooFieldNames.put("bar.test.field5", bar_test_fieldNames5);
+ fooPositionIndices.put("bar.test.field5", bar_test_positionIndices5);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test.field5",
+ fooTypes.get("bar.test.field5"),
+ false,
+ false,
+ bar_test_fieldNames5,
+ bar_test_positionIndices5));
- protected static Map<String, Type> fooTypes;
- protected static List<PulsarColumnHandle> fooColumnHandles;
- protected static Map<TopicName, PulsarSplit> splits;
+ String[] bar_test_fieldNames6 = {"bar", "test", "field6"};
+ Integer[] bar_test_positionIndices6 = {9, 2, 2};
+ fooFieldNames.put("bar.test.field6", bar_test_fieldNames6);
+ fooPositionIndices.put("bar.test.field6", bar_test_positionIndices6);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test.field6",
+ fooTypes.get("bar.test.field6"),
+ false,
+ false,
+ bar_test_fieldNames6,
+ bar_test_positionIndices6));
- static {
- topicNames = new LinkedList<>();
- topicNames.add(TOPIC_1);
- topicNames.add(TOPIC_2);
- topicNames.add(TOPIC_3);
- topicNames.add(TOPIC_4);
- topicNames.add(TOPIC_5);
- topicNames.add(TOPIC_6);
-
- partitionedTopicNames = new LinkedList<>();
- partitionedTopicNames.add(PARTITIONED_TOPIC_1);
- partitionedTopicNames.add(PARTITIONED_TOPIC_2);
- partitionedTopicNames.add(PARTITIONED_TOPIC_3);
- partitionedTopicNames.add(PARTITIONED_TOPIC_4);
- partitionedTopicNames.add(PARTITIONED_TOPIC_5);
- partitionedTopicNames.add(PARTITIONED_TOPIC_6);
-
- partitionedTopicsToPartitions = new HashMap<>();
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_1.toString(), 2);
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_2.toString(), 3);
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_3.toString(), 4);
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_4.toString(), 5);
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_5.toString(), 6);
- partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
-
- topicsToSchemas = new HashMap<>();
- topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
-
- topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
- topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-
- fooTypes = new HashMap<>();
- fooTypes.put("field1", IntegerType.INTEGER);
- fooTypes.put("field2", VarcharType.VARCHAR);
- fooTypes.put("field3", RealType.REAL);
- fooTypes.put("field4", DoubleType.DOUBLE);
- fooTypes.put("field5", BooleanType.BOOLEAN);
- fooTypes.put("field6", BigintType.BIGINT);
- fooTypes.put("timestamp", TIMESTAMP);
- fooTypes.put("time", TIME);
- fooTypes.put("date", DATE);
-
- topicsToNumEntries = new HashMap<>();
- topicsToNumEntries.put(TOPIC_1.getSchemaName(), 1233L);
- topicsToNumEntries.put(TOPIC_2.getSchemaName(), 0L);
- topicsToNumEntries.put(TOPIC_3.getSchemaName(), 100L);
- topicsToNumEntries.put(TOPIC_4.getSchemaName(), 12345L);
- topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
- topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_4.getSchemaName(), 0L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_5.getSchemaName(), 800L);
- topicsToNumEntries.put(PARTITIONED_TOPIC_6.getSchemaName(), 1L);
-
- fooColumnHandles = new LinkedList<>();
- for (int i = 0; i < Foo.class.getDeclaredFields().length; i++) {
- Field field = Foo.class.getDeclaredFields()[i];
+ String[] bar_test_foobar_fieldNames1 = {"bar", "test", "foobar", "field1"};
+ Integer[] bar_test_foobar_positionIndices1 = {9, 2, 6, 0};
+ fooFieldNames.put("bar.test.foobar.field1", bar_test_foobar_fieldNames1);
+ fooPositionIndices.put("bar.test.foobar.field1", bar_test_foobar_positionIndices1);
fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
- field.getName(),
- fooTypes.get(field.getName()),
+ "bar.test.foobar.field1",
+ fooTypes.get("bar.test.foobar.field1"),
false,
false,
- i));
- }
- fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
- new Function<PulsarInternalColumn, PulsarColumnHandle>() {
- @Override
- public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
- return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
- }
- }).collect(Collectors.toList()));
-
- splits = new HashMap<>();
-
- List<TopicName> allTopics = new LinkedList<>();
- allTopics.addAll(topicNames);
- allTopics.addAll(partitionedTopicNames);
-
- for (TopicName topicName : allTopics) {
- splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
- topicName.getNamespace(), topicName.getLocalName(),
- topicsToNumEntries.get(topicName.getSchemaName()),
- new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
- topicsToSchemas.get(topicName.getSchemaName()).getType(),
- 0, topicsToNumEntries.get(topicName.getSchemaName()),
- 0, 0, TupleDomain.all()));
+ bar_test_foobar_fieldNames1,
+ bar_test_foobar_positionIndices1));
+
+ String[] bar_field3 = {"bar", "field3"};
+ Integer[] bar_positionIndices3 = {9, 3};
+ fooFieldNames.put("bar.field3", bar_field3);
+ fooPositionIndices.put("bar.field3", bar_positionIndices3);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.field3",
+ fooTypes.get("bar.field3"),
+ false,
+ false,
+ bar_field3,
+ bar_positionIndices3));
+
+ String[] bar_test2_fieldNames4 = {"bar", "test2", "field4"};
+ Integer[] bar_test2_positionIndices4 = {9, 4, 0};
+ fooFieldNames.put("bar.test2.field4", bar_test2_fieldNames4);
+ fooPositionIndices.put("bar.test2.field4", bar_test2_positionIndices4);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test2.field4",
+ fooTypes.get("bar.test2.field4"),
+ false,
+ false,
+ bar_test2_fieldNames4,
+ bar_test2_positionIndices4));
+
+ String[] bar_test2_fieldNames5 = {"bar", "test2", "field5"};
+ Integer[] bar_test2_positionIndices5 = {9, 4, 1};
+ fooFieldNames.put("bar.test2.field5", bar_test2_fieldNames5);
+ fooPositionIndices.put("bar.test2.field5", bar_test2_positionIndices5);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test2.field5",
+ fooTypes.get("bar.test2.field5"),
+ false,
+ false,
+ bar_test2_fieldNames5,
+ bar_test2_positionIndices5));
+
+ String[] bar_test2_fieldNames6 = {"bar", "test2", "field6"};
+ Integer[] bar_test2_positionIndices6 = {9, 4, 2};
+ fooFieldNames.put("bar.test2.field6", bar_test2_fieldNames6);
+ fooPositionIndices.put("bar.test2.field6", bar_test2_positionIndices6);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test2.field6",
+ fooTypes.get("bar.test2.field6"),
+ false,
+ false,
+ bar_test2_fieldNames6,
+ bar_test2_positionIndices6));
+
+ String[] bar_test2_foobar_fieldNames1 = {"bar", "test2", "foobar", "field1"};
+ Integer[] bar_test2_foobar_positionIndices1 = {9, 4, 6, 0};
+ fooFieldNames.put("bar.test2.foobar.field1", bar_test2_foobar_fieldNames1);
+ fooPositionIndices.put("bar.test2.foobar.field1", bar_test2_foobar_positionIndices1);
+ fooColumnHandles.add(new PulsarColumnHandle(pulsarConnectorId.toString(),
+ "bar.test2.foobar.field1",
+ fooTypes.get("bar.test2.foobar.field1"),
+ false,
+ false,
+ bar_test2_foobar_fieldNames1,
+ bar_test2_foobar_positionIndices1));
+
+ fooColumnHandles.addAll(PulsarInternalColumn.getInternalFields().stream().map(
+ new Function<PulsarInternalColumn, PulsarColumnHandle>() {
+ @Override
+ public PulsarColumnHandle apply(PulsarInternalColumn pulsarInternalColumn) {
+ return pulsarInternalColumn.getColumnHandle(pulsarConnectorId.toString(), false);
+ }
+ }).collect(Collectors.toList()));
+
+ splits = new HashMap<>();
+
+ List<TopicName> allTopics = new LinkedList<>();
+ allTopics.addAll(topicNames);
+ allTopics.addAll(partitionedTopicNames);
+
+ for (TopicName topicName : allTopics) {
+ splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
+ topicName.getNamespace(), topicName.getLocalName(),
+ topicsToNumEntries.get(topicName.getSchemaName()),
+ new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
+ topicsToSchemas.get(topicName.getSchemaName()).getType(),
+ 0, topicsToNumEntries.get(topicName.getSchemaName()),
+ 0, 0, TupleDomain.all()));
+ }
+
+ fooFunctions = new HashMap<>();
+
+ fooFunctions.put("field1", integer -> integer);
+ fooFunctions.put("field2", integer -> String.valueOf(integer));
+ fooFunctions.put("field3", integer -> integer.floatValue());
+ fooFunctions.put("field4", integer -> integer.doubleValue());
+ fooFunctions.put("field5", integer -> integer % 2 == 0);
+ fooFunctions.put("field6", integer -> integer.longValue());
+ fooFunctions.put("timestamp", integer -> System.currentTimeMillis());
+ fooFunctions.put("time", integer -> {
+ LocalTime now = LocalTime.now(ZoneId.systemDefault());
+ return now.toSecondOfDay() * 1000;
+ });
+ fooFunctions.put("date", integer -> {
+ LocalDate localDate = LocalDate.now();
+ LocalDate epoch = LocalDate.ofEpochDay(0);
+ return Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+ });
+ fooFunctions.put("bar.field1", integer -> integer % 3 == 0 ? null : integer + 1);
+ fooFunctions.put("bar.field2", integer -> integer % 2 == 0 ? null : String.valueOf(integer + 2));
+ fooFunctions.put("bar.field3", integer -> integer + 3.0f);
+
+ fooFunctions.put("bar.test.field4", integer -> integer + 1.0);
+ fooFunctions.put("bar.test.field5", integer -> (integer + 1) % 2 == 0);
+ fooFunctions.put("bar.test.field6", integer -> integer + 10L);
+ fooFunctions.put("bar.test.foobar.field1", integer -> integer % 3);
+
+ fooFunctions.put("bar.test2.field4", integer -> integer + 2.0);
+ fooFunctions.put("bar.test2.field5", integer -> (integer + 1) % 32 == 0);
+ fooFunctions.put("bar.test2.field6", integer -> integer + 15L);
+ fooFunctions.put("bar.test2.foobar.field1", integer -> integer % 3);
+
+
+ } catch (Throwable e) {
+ System.out.println("Error: " + e);
+ System.out.println("Stacktrace: " + Arrays.asList(e.getStackTrace()));
+ throw e;
}
}
@@ -550,21 +795,45 @@ public abstract class TestPulsarConnector {
List<Entry> entries = new LinkedList<>();
for (int i = 0; i < readEntries; i++) {
+ Foo.Bar foobar = new Foo.Bar();
+ foobar.field1 = (int) fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+ Boo boo1 = new Boo();
+ boo1.field4 = (double) fooFunctions.get("bar.test.field4").apply(count);
+ boo1.field5 = (boolean) fooFunctions.get("bar.test.field5").apply(count);
+ boo1.field6 = (long) fooFunctions.get("bar.test.field6").apply(count);
+ boo1.foo = new Foo();
+ boo1.boo = null;
+ boo1.bar = new Bar();
+ boo1.foobar = foobar;
+
+ Boo boo2 = new Boo();
+ boo2.field4 = (double) fooFunctions.get("bar.test2.field4").apply(count);
+ boo2.field5 = (boolean) fooFunctions.get("bar.test2.field5").apply(count);
+ boo2.field6 = (long) fooFunctions.get("bar.test2.field6").apply(count);
+ boo2.foo = new Foo();
+ boo2.boo = boo1;
+ boo2.bar = new Bar();
+ boo2.foobar = foobar;
+
+ TestPulsarConnector.Bar bar = new TestPulsarConnector.Bar();
+ bar.field1 = fooFunctions.get("bar.field1").apply(count) == null ? null : (int) fooFunctions.get("bar.field1").apply(count);
+ bar.field2 = fooFunctions.get("bar.field2").apply(count) == null ? null : (String) fooFunctions.get("bar.field2").apply(count);
+ bar.field3 = (float) fooFunctions.get("bar.field3").apply(count);
+ bar.test = boo1;
+ bar.test2 = count % 2 == 0 ? null : boo2;
+
Foo foo = new Foo();
- foo.field1 = count;
- foo.field2 = String.valueOf(count);
- foo.field3 = count;
- foo.field4 = count;
- foo.field5 = count % 2 == 0;
- foo.field6 = count;
- foo.timestamp = System.currentTimeMillis();
-
- LocalTime now = LocalTime.now(ZoneId.systemDefault());
- foo.time = now.toSecondOfDay() * 1000;
-
- LocalDate localDate = LocalDate.now();
- LocalDate epoch = LocalDate.ofEpochDay(0);
- foo.date = Math.toIntExact(ChronoUnit.DAYS.between(epoch, localDate));
+ foo.field1 = (int) fooFunctions.get("field1").apply(count);
+ foo.field2 = (String) fooFunctions.get("field2").apply(count);
+ foo.field3 = (float) fooFunctions.get("field3").apply(count);
+ foo.field4 = (double) fooFunctions.get("field4").apply(count);
+ foo.field5 = (boolean) fooFunctions.get("field5").apply(count);
+ foo.field6 = (long) fooFunctions.get("field6").apply(count);
+ foo.timestamp = (long) fooFunctions.get("timestamp").apply(count);
+ foo.time = (int) fooFunctions.get("time").apply(count);
+ foo.date = (int) fooFunctions.get("date").apply(count);
+ foo.bar = bar;
PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
.setProducerName("test-producer").setSequenceId(positions.get(topic))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 7726763..17ce416 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -85,16 +84,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
@Test
- public void testGetTableLayouts() {
-
- }
-
- @Test
- public void testGetTableLayout() {
-
- }
-
- @Test
public void testGetTableMetadata() {
List<TopicName> allTopics = new LinkedList<>();
@@ -116,12 +105,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
Assert.assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
Assert.assertEquals(tableMetadata.getColumns().size(),
- Foo.class.getDeclaredFields().length + PulsarInternalColumn.getInternalFields().size());
+ fooColumnHandles.size());
- List<String> fieldNames = new LinkedList<>();
- for (Field field : Foo.class.getDeclaredFields()) {
- fieldNames.add(field.getName());
- }
+ List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
fieldNames.add(internalField.getName());
@@ -278,10 +264,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
Map<String, ColumnHandle> columnHandleMap
= new HashMap<>(this.pulsarMetadata.getColumnHandles(mock(ConnectorSession.class), pulsarTableHandle));
- List<String> fieldNames = new LinkedList<>();
- for (Field field : Foo.class.getDeclaredFields()) {
- fieldNames.add(field.getName());
- }
+ List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
fieldNames.add(internalField.getName());
@@ -298,9 +281,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
Schema schema = new Schema.Parser().parse(new String(topicsToSchemas.get(TOPIC_1.getSchemaName())
.getSchema()));
Assert.assertEquals(pulsarColumnHandle.getConnectorId(), pulsarConnectorId.toString());
- Assert.assertEquals(pulsarColumnHandle.getName(), schema.getField(field).name());
- Assert.assertNotNull(pulsarColumnHandle.getPositionIndex());
- Assert.assertEquals(pulsarColumnHandle.getPositionIndex().intValue(), schema.getField(field).pos());
+ Assert.assertEquals(pulsarColumnHandle.getName(), field);
+ Assert.assertEquals(pulsarColumnHandle.getPositionIndices(), fooPositionIndices.get(field));
+ Assert.assertEquals(pulsarColumnHandle.getFieldNames(), fooFieldNames.get(field));
Assert.assertEquals(pulsarColumnHandle.getType(), fooTypes.get(field));
Assert.assertEquals(pulsarColumnHandle.isHidden(), false);
}
@@ -319,13 +302,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
List<ColumnMetadata> columnMetadataList
= tableColumnsMap.get(new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()));
Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
- PulsarInternalColumn.getInternalFields().size());
+ Assert.assertEquals(columnMetadataList.size(),
+ fooColumnHandles.size());
- List<String> fieldNames = new LinkedList<>();
- for (Field field : Foo.class.getDeclaredFields()) {
- fieldNames.add(field.getName());
- }
+ List<String> fieldNames = new LinkedList<>(fooFieldNames.keySet());
for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
fieldNames.add(internalField.getName());
@@ -345,13 +325,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_2.getNamespace(), TOPIC_2.getLocalName()));
Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
- PulsarInternalColumn.getInternalFields().size());
+ Assert.assertEquals(columnMetadataList.size(),
+ fooColumnHandles.size());
- fieldNames = new LinkedList<>();
- for (Field field : Foo.class.getDeclaredFields()) {
- fieldNames.add(field.getName());
- }
+ fieldNames = new LinkedList<>(fooFieldNames.keySet());
for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
fieldNames.add(internalField.getName());
@@ -377,13 +354,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
Assert.assertEquals(tableColumnsMap.size(), 1);
columnMetadataList = tableColumnsMap.get(new SchemaTableName(TOPIC_4.getNamespace(), TOPIC_4.getLocalName()));
Assert.assertNotNull(columnMetadataList);
- Assert.assertEquals(columnMetadataList.size(), Foo.class.getDeclaredFields().length,
- PulsarInternalColumn.getInternalFields().size());
+ Assert.assertEquals(columnMetadataList.size(),
+ fooColumnHandles.size());
- fieldNames = new LinkedList<>();
- for (Field field : Foo.class.getDeclaredFields()) {
- fieldNames.add(field.getName());
- }
+ fieldNames = new LinkedList<>(fooFieldNames.keySet());
for (PulsarInternalColumn internalField : PulsarInternalColumn.getInternalFields()) {
fieldNames.add(internalField.getName());
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 74126a6..1b323a2 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -19,16 +19,14 @@
package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
-import io.airlift.slice.Slice;
import org.apache.pulsar.common.naming.TopicName;
import org.testng.Assert;
import org.testng.annotations.Test;
-import java.nio.charset.Charset;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
-import static com.facebook.presto.spi.type.BigintType.BIGINT;
-
@Test(singleThreaded = true)
public class TestPulsarRecordCursor extends TestPulsarConnector {
@@ -44,31 +42,81 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
PulsarRecordCursor pulsarRecordCursor = entry.getValue();
TopicName topicName = entry.getKey();
- long count = 0L;
+ int count = 0;
while (pulsarRecordCursor.advanceNextPosition()) {
+ List<String> columnsSeen = new LinkedList<>();
for (int i = 0; i < fooColumnHandles.size(); i++) {
-
if (pulsarRecordCursor.isNull(i)) {
-
+ columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
- if (fooColumnHandles.get(i).getType().getJavaType() == long.class) {
- if (fooColumnHandles.get(i).getType() == BIGINT) {
- Assert.assertEquals(pulsarRecordCursor.getLong(i), count);
- }
- } else if (fooColumnHandles.get(i).getType().getJavaType() == boolean.class) {
- Assert.assertEquals(pulsarRecordCursor.getBoolean(i), count % 2 == 0);
- } else if (fooColumnHandles.get(i).getType().getJavaType() == double.class) {
- Assert.assertEquals(pulsarRecordCursor.getDouble(i), Long.valueOf(count).doubleValue());
- } else if (fooColumnHandles.get(i).getType().getJavaType() == Slice.class) {
- if (!fooColumnHandles.get(i).isInternal()) {
- Assert.assertEquals(pulsarRecordCursor.getSlice(i).toStringUtf8().getBytes(),
- Charset.forName("UTF-8").encode(String.valueOf(count)).array());
- }
+ if (fooColumnHandles.get(i).getName().equals("field1")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("field1").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("field2")) {
+ Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("field2").apply(count)).getBytes());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("field3")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("field3").apply(count)).floatValue()));
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("field4")) {
+ Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("field4").apply(count)).doubleValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("field5")) {
+ Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("field5").apply(count)).booleanValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("field6")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("field6").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("timestamp")) {
+ pulsarRecordCursor.getLong(i);
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("time")) {
+ pulsarRecordCursor.getLong(i);
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("date")) {
+ pulsarRecordCursor.getLong(i);
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.field1")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.field1").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.field2")) {
+ Assert.assertEquals(pulsarRecordCursor.getSlice(i).getBytes(), ((String) fooFunctions.get("bar.field2").apply(count)).getBytes());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.field3")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), Float.floatToIntBits(((Float) fooFunctions.get("bar.field3").apply(count)).floatValue()));
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test.field4")) {
+ Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test.field4").apply(count)).doubleValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test.field5")) {
+ Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test.field5").apply(count)).booleanValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test.field6")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test.field6").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test.foobar.field1")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test.foobar.field1").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field4")) {
+ Assert.assertEquals(pulsarRecordCursor.getDouble(i), ((Double) fooFunctions.get("bar.test2.field4").apply(count)).doubleValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field5")) {
+ Assert.assertEquals(pulsarRecordCursor.getBoolean(i), ((Boolean) fooFunctions.get("bar.test2.field5").apply(count)).booleanValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test2.field6")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Long) fooFunctions.get("bar.test2.field6").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ } else if (fooColumnHandles.get(i).getName().equals("bar.test2.foobar.field1")) {
+ Assert.assertEquals(pulsarRecordCursor.getLong(i), ((Integer) fooFunctions.get("bar.test2.foobar.field1").apply(count)).longValue());
+ columnsSeen.add(fooColumnHandles.get(i).getName());
} else {
- Assert.fail("Unknown type: " + fooColumnHandles.get(i).getType().getJavaType());
+ if (PulsarInternalColumn.getInternalFieldsMap().containsKey(fooColumnHandles.get(i).getName())) {
+ columnsSeen.add(fooColumnHandles.get(i).getName());
+ }
}
}
}
+ Assert.assertEquals(columnsSeen.size(), fooColumnHandles.size());
count++;
}
Assert.assertEquals(count, topicsToNumEntries.get(topicName.getSchemaName()).longValue());