You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/12/25 06:40:36 UTC
[incubator-pinot] 01/01: Make required interfaces or classes
serializable for spark
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch make-objects-serializable
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit a6744d89f8a69a0487d93a445582fed5cf36a076
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Dec 24 22:39:47 2020 -0800
Make required interfaces or classes serializable for spark
---
.../core/data/partition/PartitionFunction.java | 5 ++-
.../data/recordtransformer/RecordTransformer.java | 3 +-
.../generator/SegmentGeneratorConfig.java | 3 +-
.../segment/creator/ColumnIndexCreationInfo.java | 3 +-
.../core/segment/creator/ColumnStatistics.java | 3 +-
.../segment/creator/SegmentCreationDataSource.java | 3 +-
.../pinot/core/segment/creator/SegmentCreator.java | 3 +-
.../creator/SegmentIndexCreationDriver.java | 3 +-
.../segment/creator/SegmentIndexCreationInfo.java | 5 ++-
.../creator/SegmentPreIndexStatsContainer.java | 5 ++-
.../pinot/core/segment/memory/PinotByteBuffer.java | 15 +++----
.../core/segment/name/SegmentNameGenerator.java | 3 +-
.../apache/pinot/spi/config/BaseJsonConfig.java | 3 +-
.../java/org/apache/pinot/spi/data/FieldSpec.java | 3 +-
.../pinot/spi/data/IngestionSchemaValidator.java | 4 +-
.../java/org/apache/pinot/spi/data/Schema.java | 46 +++++++++++++++++++++-
.../apache/pinot/spi/data/TimeGranularitySpec.java | 5 +--
.../apache/pinot/spi/data/readers/GenericRow.java | 3 +-
.../pinot/spi/data/readers/RecordExtractor.java | 3 +-
.../pinot/spi/data/readers/RecordReader.java | 3 +-
20 files changed, 95 insertions(+), 29 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
index 8c8c18d..b3dec8d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.core.data.partition;
+import java.io.Serializable;
+
+
/**
* Interface for partition function.
*/
-public interface PartitionFunction {
+public interface PartitionFunction extends Serializable {
/**
* Method to compute and return partition id for the given value.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
index d397578..ebffe54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.data.recordtransformer;
+import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -25,7 +26,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
/**
* The record transformer which takes a {@link GenericRow} and transform it based on some custom rules.
*/
-public interface RecordTransformer {
+public interface RecordTransformer extends Serializable {
/**
* Transforms a record based on some custom rules.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 1d3324f..a5ca445 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.indexsegment.generator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory;
/**
* Configuration properties used in the creation of index segments.
*/
-public class SegmentGeneratorConfig {
+public class SegmentGeneratorConfig implements Serializable {
public enum TimeColumnType {
EPOCH, SIMPLE_DATE
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 986d07d..cd0eace 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.core.segment.creator;
+import java.io.Serializable;
import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.pinot.core.common.Constants;
import org.apache.pinot.core.data.partition.PartitionFunction;
-public class ColumnIndexCreationInfo {
+public class ColumnIndexCreationInfo implements Serializable {
private final boolean createDictionary;
private final boolean useVarLengthDictionary;
private final boolean isAutoGenerated;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
index 4f5636e..bbfae0a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.creator;
+import java.io.Serializable;
import java.util.Set;
import org.apache.pinot.core.data.partition.PartitionFunction;
@@ -25,7 +26,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
/**
* An interface to read the column statistics from statistics collectors.
*/
-public interface ColumnStatistics {
+public interface ColumnStatistics extends Serializable {
/**
* @return Minimum value of the column
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
index 5bcac0d..151fe10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.core.segment.creator;
+import java.io.Serializable;
import org.apache.pinot.spi.data.readers.RecordReader;
/**
* Data source used to build segments.
*/
-public interface SegmentCreationDataSource {
+public interface SegmentCreationDataSource extends Serializable {
SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig);
RecordReader getRecordReader();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..2616dbf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.creator;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.pinot.spi.data.Schema;
@@ -31,7 +32,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
/**
* Interface for segment creators, which create an index over a set of rows and writes the resulting index to disk.
*/
-public interface SegmentCreator extends Closeable {
+public interface SegmentCreator extends Closeable, Serializable {
/**
* Initializes the segment creation.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
index d830fa6..5c608c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.segment.creator;
import java.io.File;
+import java.io.Serializable;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
@@ -29,7 +30,7 @@ import org.apache.pinot.spi.data.IngestionSchemaValidator;
* Nov 6, 2014
*/
-public interface SegmentIndexCreationDriver {
+public interface SegmentIndexCreationDriver extends Serializable {
/**
* Configures the segment generator with the given segment generator configuration, which contains the input file
* location, format, schema and other necessary information to create an index segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
index 8ca1e73..ae57f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
@@ -18,7 +18,10 @@
*/
package org.apache.pinot.core.segment.creator;
-public class SegmentIndexCreationInfo {
+import java.io.Serializable;
+
+
+public class SegmentIndexCreationInfo implements Serializable {
private int totalDocs;
public int getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
index e56c027..1d3ec21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
@@ -18,10 +18,13 @@
*/
package org.apache.pinot.core.segment.creator;
+import java.io.Serializable;
+
+
/**
* Container for per-column stats for a segment
*/
-public interface SegmentPreIndexStatsContainer {
+public interface SegmentPreIndexStatsContainer extends Serializable {
ColumnStatistics getColumnProfileFor(String column)
throws Exception;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
index 87d5feb..d1136f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.memory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
@@ -231,7 +232,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
}
} else {
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position(intOffset);
+ ((Buffer) duplicate).position(intOffset);
duplicate.get(buffer, destOffset, size);
}
}
@@ -243,7 +244,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
int start = (int) offset;
int end = start + (int) size;
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position(start).limit(end);
+ ((Buffer) duplicate).position(start).limit(end);
buffer.readFrom(destOffset, duplicate);
}
@@ -258,7 +259,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
}
} else {
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position(intOffset);
+ ((Buffer) duplicate).position(intOffset);
duplicate.put(buffer, srcOffset, size);
}
}
@@ -267,7 +268,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
public void readFrom(long offset, ByteBuffer buffer) {
assert offset <= Integer.MAX_VALUE;
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position((int) offset);
+ ((Buffer) duplicate).position((int) offset);
duplicate.put(buffer);
}
@@ -280,7 +281,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
ByteBuffer duplicate = _buffer.duplicate();
int start = (int) offset;
int end = start + (int) size;
- duplicate.position(start).limit(end);
+ ((Buffer) duplicate).position(start).limit(end);
randomAccessFile.getChannel().read(duplicate, srcOffset);
}
}
@@ -300,7 +301,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
assert start <= end;
assert end <= Integer.MAX_VALUE;
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position((int) start).limit((int) end);
+ ((Buffer) duplicate).position((int) start).limit((int) end);
ByteBuffer buffer = duplicate.slice();
buffer.order(byteOrder);
return new PinotByteBuffer(buffer, false, false);
@@ -312,7 +313,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
int start = (int) offset;
int end = start + size;
ByteBuffer duplicate = _buffer.duplicate();
- duplicate.position(start).limit(end);
+ ((Buffer) duplicate).position(start).limit(end);
ByteBuffer buffer = duplicate.slice();
buffer.order(byteOrder);
return buffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
index eed203c..430ca99 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
@@ -19,13 +19,14 @@
package org.apache.pinot.core.segment.name;
import com.google.common.base.Joiner;
+import java.io.Serializable;
import javax.annotation.Nullable;
/**
* Interface for segment name generator based on the segment sequence id and time range.
*/
-public interface SegmentNameGenerator {
+public interface SegmentNameGenerator extends Serializable {
Joiner JOINER = Joiner.on('_').skipNulls();
/**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
index 33661f6..b2c5486 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
+import java.io.Serializable;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -30,7 +31,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
-public abstract class BaseJsonConfig {
+public abstract class BaseJsonConfig implements Serializable {
public JsonNode toJsonNode() {
return JsonUtils.objectToJsonNode(this);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..a393ca5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.data;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.EqualityUtils;
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
* <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field.
*/
@SuppressWarnings("unused")
-public abstract class FieldSpec implements Comparable<FieldSpec> {
+public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
private static final int DEFAULT_MAX_LENGTH = 512;
// TODO: revisit to see if we allow 0-length byte array
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..bdbcae3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.spi.data;
+import java.io.Serializable;
+
/**
* Validator to validate the schema between Pinot schema and input raw data schema
*/
-public interface IngestionSchemaValidator {
+public interface IngestionSchemaValidator extends Serializable {
void init(Schema pinotSchema, String inputFilePath);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 723a1c1..57b686b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -59,7 +60,7 @@ import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
*/
@SuppressWarnings("unused")
@JsonIgnoreProperties(ignoreUnknown = true)
-public final class Schema {
+public final class Schema implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
private String _schemaName;
@@ -73,7 +74,7 @@ public final class Schema {
private List<String> _primaryKeyColumns;
// Json ignored fields
- private transient final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
+ private final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
private transient final List<String> _dimensionNames = new ArrayList<>();
private transient final List<String> _metricNames = new ArrayList<>();
private transient final List<String> _dateTimeNames = new ArrayList<>();
@@ -803,4 +804,45 @@ public final class Schema {
}
return outerFunction;
}
+
+ public static void main(String[] args)
+ throws IOException {
+ String string = "{\n" + " \"schemaName\" : \"internalTesting\",\n" + " \"dimensionFieldSpecs\" : [ {\n"
+ + " \"name\" : \"actionTime\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"minutesSinceEpoch\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"hoursSinceEpoch\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"weeksSinceEpochSunday\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"actorId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"companyId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"occupationId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"profileIndustryId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"regionId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"referrer\",\n" + " \"dataType\" : \"STRING\"\n" + " }, {\n"
+ + " \"name\" : \"socialGestureId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"authorId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"shareId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"activityId\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"shareCreationTimestamp\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"isFollowing\",\n" + " \"dataType\" : \"STRING\",\n"
+ + " \"defaultNullValue\" : \"false\"\n" + " }, {\n" + " \"name\" : \"isConnected\",\n"
+ + " \"dataType\" : \"STRING\",\n" + " \"defaultNullValue\" : \"false\"\n" + " }, {\n"
+ + " \"name\" : \"occupationSeniority\",\n" + " \"dataType\" : \"INT\"\n" + " } ],\n"
+ + " \"metricFieldSpecs\" : [ {\n" + " \"name\" : \"viewCount\",\n" + " \"dataType\" : \"LONG\"\n"
+ + " }, {\n" + " \"name\" : \"likeCount\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"commentCount\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"reshareCount\",\n" + " \"dataType\" : \"LONG\"\n" + " }, {\n"
+ + " \"name\" : \"replyToCommentCount\",\n" + " \"dataType\" : \"LONG\"\n" + " } ],\n"
+ + " \"timeFieldSpec\" : {\n" + " \"incomingGranularitySpec\" : {\n"
+ + " \"name\" : \"daysSinceEpoch\",\n" + " \"dataType\" : \"LONG\",\n"
+ + " \"timeType\" : \"DAYS\"\n" + " }\n" + " }\n" + "}";
+ Schema schema = Schema.fromString(string);
+
+ System.out.println(schema);
+
+ System.out.println(schema.getColumnNames());
+ String timeColumnName = "daysSinceEpoch";
+ DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName);
+
+ System.out.println(dateTimeFieldSpec);
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
index e97f537..51e43cf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
@@ -21,12 +21,11 @@ package org.apache.pinot.spi.data;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.EqualityUtils;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
/**
@@ -47,7 +46,7 @@ import org.joda.time.format.DateTimeFormat;
*/
@SuppressWarnings("unused")
@JsonIgnoreProperties(ignoreUnknown = true)
-public class TimeGranularitySpec {
+public class TimeGranularitySpec implements Serializable {
private static final int DEFAULT_TIME_UNIT_SIZE = 1;
private static final String DEFAULT_TIME_FORMAT = TimeFormat.EPOCH.toString();
private static final String COLON_SEPARATOR = ":";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5bddf02..72e5c52 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -49,7 +50,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
* MV: Object[] or List of Byte, Character, Short, Integer, Long, Float, Double, String
* We should not be using Boolean, Byte, Character and Short to keep it simple
*/
-public class GenericRow {
+public class GenericRow implements Serializable {
/**
* This key is used by a Decoder/RecordReader to handle 1 record to many records flattening.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
index 0df528b..a5386c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.data.readers;
+import java.io.Serializable;
import java.util.Set;
import javax.annotation.Nullable;
@@ -29,7 +30,7 @@ import javax.annotation.Nullable;
* 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
* @param <T> The format of the input record
*/
-public interface RecordExtractor<T> {
+public interface RecordExtractor<T> extends Serializable {
/**
* Initialize the record extractor with its config
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 15ba945..8486a9d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Set;
import javax.annotation.Nullable;
@@ -30,7 +31,7 @@ import javax.annotation.Nullable;
* Pinot segments will be generated from {@link GenericRow}s.
* <p>NOTE: for time column, record reader should be able to read both incoming and outgoing time
*/
-public interface RecordReader extends Closeable {
+public interface RecordReader extends Closeable, Serializable {
/**
* Initializes the record reader with data file, schema and (optional) record reader config.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org