You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/12/25 06:40:36 UTC

[incubator-pinot] 01/01: Make required interfaces or classes serializable for spark

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch make-objects-serializable
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a6744d89f8a69a0487d93a445582fed5cf36a076
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Dec 24 22:39:47 2020 -0800

    Make required interfaces or classes serializable for spark
---
 .../core/data/partition/PartitionFunction.java     |  5 ++-
 .../data/recordtransformer/RecordTransformer.java  |  3 +-
 .../generator/SegmentGeneratorConfig.java          |  3 +-
 .../segment/creator/ColumnIndexCreationInfo.java   |  3 +-
 .../core/segment/creator/ColumnStatistics.java     |  3 +-
 .../segment/creator/SegmentCreationDataSource.java |  3 +-
 .../pinot/core/segment/creator/SegmentCreator.java |  3 +-
 .../creator/SegmentIndexCreationDriver.java        |  3 +-
 .../segment/creator/SegmentIndexCreationInfo.java  |  5 ++-
 .../creator/SegmentPreIndexStatsContainer.java     |  5 ++-
 .../pinot/core/segment/memory/PinotByteBuffer.java | 15 +++----
 .../core/segment/name/SegmentNameGenerator.java    |  3 +-
 .../apache/pinot/spi/config/BaseJsonConfig.java    |  3 +-
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  3 +-
 .../pinot/spi/data/IngestionSchemaValidator.java   |  4 +-
 .../java/org/apache/pinot/spi/data/Schema.java     | 46 +++++++++++++++++++++-
 .../apache/pinot/spi/data/TimeGranularitySpec.java |  5 +--
 .../apache/pinot/spi/data/readers/GenericRow.java  |  3 +-
 .../pinot/spi/data/readers/RecordExtractor.java    |  3 +-
 .../pinot/spi/data/readers/RecordReader.java       |  3 +-
 20 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
index 8c8c18d..b3dec8d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.core.data.partition;
 
+import java.io.Serializable;
+
+
 /**
  * Interface for partition function.
  */
-public interface PartitionFunction {
+public interface PartitionFunction extends Serializable {
 
   /**
    * Method to compute and return partition id for the given value.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
index d397578..ebffe54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.recordtransformer;
 
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
@@ -25,7 +26,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 /**
  * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules.
  */
-public interface RecordTransformer {
+public interface RecordTransformer extends Serializable {
 
   /**
    * Transforms a record based on some custom rules.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 1d3324f..a5ca445 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.indexsegment.generator;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Configuration properties used in the creation of index segments.
  */
-public class SegmentGeneratorConfig {
+public class SegmentGeneratorConfig implements Serializable {
   public enum TimeColumnType {
     EPOCH, SIMPLE_DATE
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 986d07d..cd0eace 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import java.util.Set;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.core.common.Constants;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 
 
-public class ColumnIndexCreationInfo {
+public class ColumnIndexCreationInfo implements Serializable {
   private final boolean createDictionary;
   private final boolean useVarLengthDictionary;
   private final boolean isAutoGenerated;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
index 4f5636e..bbfae0a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import java.util.Set;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 
@@ -25,7 +26,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 /**
  * An interface to read the column statistics from statistics collectors.
  */
-public interface ColumnStatistics {
+public interface ColumnStatistics extends Serializable {
   /**
    * @return Minimum value of the column
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
index 5bcac0d..151fe10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 /**
  * Data source used to build segments.
  */
-public interface SegmentCreationDataSource {
+public interface SegmentCreationDataSource extends Serializable {
   SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig);
 
   RecordReader getRecordReader();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..2616dbf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.creator;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pinot.spi.data.Schema;
@@ -31,7 +32,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 /**
  * Interface for segment creators, which create an index over a set of rows and writes the resulting index to disk.
  */
-public interface SegmentCreator extends Closeable {
+public interface SegmentCreator extends Closeable, Serializable {
 
   /**
    * Initializes the segment creation.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
index d830fa6..5c608c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.creator;
 
 import java.io.File;
+import java.io.Serializable;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
 
@@ -29,7 +30,7 @@ import org.apache.pinot.spi.data.IngestionSchemaValidator;
  * Nov 6, 2014
  */
 
-public interface SegmentIndexCreationDriver {
+public interface SegmentIndexCreationDriver extends Serializable {
   /**
    * Configures the segment generator with the given segment generator configuration, which contains the input file
    * location, format, schema and other necessary information to create an index segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
index 8ca1e73..ae57f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.core.segment.creator;
 
-public class SegmentIndexCreationInfo {
+import java.io.Serializable;
+
+
+public class SegmentIndexCreationInfo implements Serializable {
   private int totalDocs;
 
   public int getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
index e56c027..1d3ec21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
+
+
 /**
  * Container for per-column stats for a segment
  */
-public interface SegmentPreIndexStatsContainer {
+public interface SegmentPreIndexStatsContainer extends Serializable {
   ColumnStatistics getColumnProfileFor(String column)
       throws Exception;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
index 87d5feb..d1136f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.memory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.MappedByteBuffer;
@@ -231,7 +232,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       }
     } else {
       ByteBuffer duplicate = _buffer.duplicate();
-      duplicate.position(intOffset);
+      ((Buffer) duplicate).position(intOffset);
       duplicate.get(buffer, destOffset, size);
     }
   }
@@ -243,7 +244,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     int start = (int) offset;
     int end = start + (int) size;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position(start).limit(end);
+    ((Buffer) duplicate).position(start).limit(end);
     buffer.readFrom(destOffset, duplicate);
   }
 
@@ -258,7 +259,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       }
     } else {
       ByteBuffer duplicate = _buffer.duplicate();
-      duplicate.position(intOffset);
+      ((Buffer) duplicate).position(intOffset);
       duplicate.put(buffer, srcOffset, size);
     }
   }
@@ -267,7 +268,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
   public void readFrom(long offset, ByteBuffer buffer) {
     assert offset <= Integer.MAX_VALUE;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position((int) offset);
+    ((Buffer) duplicate).position((int) offset);
     duplicate.put(buffer);
   }
 
@@ -280,7 +281,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       ByteBuffer duplicate = _buffer.duplicate();
       int start = (int) offset;
       int end = start + (int) size;
-      duplicate.position(start).limit(end);
+      ((Buffer) duplicate).position(start).limit(end);
       randomAccessFile.getChannel().read(duplicate, srcOffset);
     }
   }
@@ -300,7 +301,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     assert start <= end;
     assert end <= Integer.MAX_VALUE;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position((int) start).limit((int) end);
+    ((Buffer) duplicate).position((int) start).limit((int) end);
     ByteBuffer buffer = duplicate.slice();
     buffer.order(byteOrder);
     return new PinotByteBuffer(buffer, false, false);
@@ -312,7 +313,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     int start = (int) offset;
     int end = start + size;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position(start).limit(end);
+    ((Buffer) duplicate).position(start).limit(end);
     ByteBuffer buffer = duplicate.slice();
     buffer.order(byteOrder);
     return buffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
index eed203c..430ca99 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.segment.name;
 
 import com.google.common.base.Joiner;
+import java.io.Serializable;
 import javax.annotation.Nullable;
 
 
 /**
  * Interface for segment name generator based on the segment sequence id and time range.
  */
-public interface SegmentNameGenerator {
+public interface SegmentNameGenerator extends Serializable {
   Joiner JOINER = Joiner.on('_').skipNulls();
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
index 33661f6..b2c5486 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
+import java.io.Serializable;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -30,7 +31,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public abstract class BaseJsonConfig {
+public abstract class BaseJsonConfig implements Serializable {
 
   public JsonNode toJsonNode() {
     return JsonUtils.objectToJsonNode(this);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..a393ca5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.data;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.EqualityUtils;
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field.
  */
 @SuppressWarnings("unused")
-public abstract class FieldSpec implements Comparable<FieldSpec> {
+public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
   private static final int DEFAULT_MAX_LENGTH = 512;
 
   // TODO: revisit to see if we allow 0-length byte array
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..bdbcae3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.spi.data;
 
+import java.io.Serializable;
+
 
 /**
  * Validator to validate the schema between Pinot schema and input raw data schema
  */
-public interface IngestionSchemaValidator {
+public interface IngestionSchemaValidator extends Serializable {
 
   void init(Schema pinotSchema, String inputFilePath);
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 723a1c1..57b686b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -59,7 +60,7 @@ import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
  */
 @SuppressWarnings("unused")
 @JsonIgnoreProperties(ignoreUnknown = true)
-public final class Schema {
+public final class Schema implements Serializable {
   private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
 
   private String _schemaName;
@@ -73,7 +74,7 @@ public final class Schema {
   private List<String> _primaryKeyColumns;
 
   // Json ignored fields
-  private transient final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
+  private final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
   private transient final List<String> _dimensionNames = new ArrayList<>();
   private transient final List<String> _metricNames = new ArrayList<>();
   private transient final List<String> _dateTimeNames = new ArrayList<>();
@@ -803,4 +804,45 @@ public final class Schema {
     }
     return outerFunction;
   }
+
+  public static void main(String[] args)
+      throws IOException {
+    String string = "{\n" + "  \"schemaName\" : \"internalTesting\",\n" + "  \"dimensionFieldSpecs\" : [ {\n"
+        + "    \"name\" : \"actionTime\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"minutesSinceEpoch\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"hoursSinceEpoch\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"weeksSinceEpochSunday\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"actorId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"companyId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"occupationId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"profileIndustryId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"regionId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"referrer\",\n" + "    \"dataType\" : \"STRING\"\n" + "  }, {\n"
+        + "    \"name\" : \"socialGestureId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"authorId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"shareId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"activityId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"shareCreationTimestamp\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"isFollowing\",\n" + "    \"dataType\" : \"STRING\",\n"
+        + "    \"defaultNullValue\" : \"false\"\n" + "  }, {\n" + "    \"name\" : \"isConnected\",\n"
+        + "    \"dataType\" : \"STRING\",\n" + "    \"defaultNullValue\" : \"false\"\n" + "  }, {\n"
+        + "    \"name\" : \"occupationSeniority\",\n" + "    \"dataType\" : \"INT\"\n" + "  } ],\n"
+        + "  \"metricFieldSpecs\" : [ {\n" + "    \"name\" : \"viewCount\",\n" + "    \"dataType\" : \"LONG\"\n"
+        + "  }, {\n" + "    \"name\" : \"likeCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"commentCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"reshareCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"replyToCommentCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  } ],\n"
+        + "  \"timeFieldSpec\" : {\n" + "    \"incomingGranularitySpec\" : {\n"
+        + "      \"name\" : \"daysSinceEpoch\",\n" + "      \"dataType\" : \"LONG\",\n"
+        + "      \"timeType\" : \"DAYS\"\n" + "    }\n" + "  }\n" + "}";
+    Schema schema = Schema.fromString(string);
+
+    System.out.println(schema);
+
+    System.out.println(schema.getColumnNames());
+    String timeColumnName = "daysSinceEpoch";
+    DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName);
+
+    System.out.println(dateTimeFieldSpec);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
index e97f537..51e43cf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
@@ -21,12 +21,11 @@ package org.apache.pinot.spi.data;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
 
 
 /**
@@ -47,7 +46,7 @@ import org.joda.time.format.DateTimeFormat;
  */
 @SuppressWarnings("unused")
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class TimeGranularitySpec {
+public class TimeGranularitySpec implements Serializable {
   private static final int DEFAULT_TIME_UNIT_SIZE = 1;
   private static final String DEFAULT_TIME_FORMAT = TimeFormat.EPOCH.toString();
   private static final String COLON_SEPARATOR = ":";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5bddf02..72e5c52 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,7 +50,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  *  MV: Object[] or List of Byte, Character, Short, Integer, Long, Float, Double, String
  *  We should not be using Boolean, Byte, Character and Short to keep it simple
  */
-public class GenericRow {
+public class GenericRow implements Serializable {
 
   /**
    * This key is used by a Decoder/RecordReader to handle 1 record to many records flattening.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
index 0df528b..a5386c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import java.io.Serializable;
 import java.util.Set;
 import javax.annotation.Nullable;
 
@@ -29,7 +30,7 @@ import javax.annotation.Nullable;
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
  */
-public interface RecordExtractor<T> {
+public interface RecordExtractor<T> extends Serializable {
 
   /**
    * Initialize the record extractor with its config
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 15ba945..8486a9d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Set;
 import javax.annotation.Nullable;
 
@@ -30,7 +31,7 @@ import javax.annotation.Nullable;
  * Pinot segments will be generated from {@link GenericRow}s.
  * <p>NOTE: for time column, record reader should be able to read both incoming and outgoing time
  */
-public interface RecordReader extends Closeable {
+public interface RecordReader extends Closeable, Serializable {
 
   /**
    * Initializes the record reader with data file, schema and (optional) record reader config.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org