You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/12/25 06:40:35 UTC

[incubator-pinot] branch make-objects-serializable created (now a6744d8)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch make-objects-serializable
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at a6744d8  Make required interfaces or classes serializable for spark

This branch includes the following new commits:

     new a6744d8  Make required interfaces or classes serializable for spark

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Make required interfaces or classes serializable for spark

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch make-objects-serializable
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a6744d89f8a69a0487d93a445582fed5cf36a076
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Thu Dec 24 22:39:47 2020 -0800

    Make required interfaces or classes serializable for spark
---
 .../core/data/partition/PartitionFunction.java     |  5 ++-
 .../data/recordtransformer/RecordTransformer.java  |  3 +-
 .../generator/SegmentGeneratorConfig.java          |  3 +-
 .../segment/creator/ColumnIndexCreationInfo.java   |  3 +-
 .../core/segment/creator/ColumnStatistics.java     |  3 +-
 .../segment/creator/SegmentCreationDataSource.java |  3 +-
 .../pinot/core/segment/creator/SegmentCreator.java |  3 +-
 .../creator/SegmentIndexCreationDriver.java        |  3 +-
 .../segment/creator/SegmentIndexCreationInfo.java  |  5 ++-
 .../creator/SegmentPreIndexStatsContainer.java     |  5 ++-
 .../pinot/core/segment/memory/PinotByteBuffer.java | 15 +++----
 .../core/segment/name/SegmentNameGenerator.java    |  3 +-
 .../apache/pinot/spi/config/BaseJsonConfig.java    |  3 +-
 .../java/org/apache/pinot/spi/data/FieldSpec.java  |  3 +-
 .../pinot/spi/data/IngestionSchemaValidator.java   |  4 +-
 .../java/org/apache/pinot/spi/data/Schema.java     | 46 +++++++++++++++++++++-
 .../apache/pinot/spi/data/TimeGranularitySpec.java |  5 +--
 .../apache/pinot/spi/data/readers/GenericRow.java  |  3 +-
 .../pinot/spi/data/readers/RecordExtractor.java    |  3 +-
 .../pinot/spi/data/readers/RecordReader.java       |  3 +-
 20 files changed, 95 insertions(+), 29 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
index 8c8c18d..b3dec8d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/partition/PartitionFunction.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.core.data.partition;
 
+import java.io.Serializable;
+
+
 /**
  * Interface for partition function.
  */
-public interface PartitionFunction {
+public interface PartitionFunction extends Serializable {
 
   /**
    * Method to compute and return partition id for the given value.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
index d397578..ebffe54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/RecordTransformer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.recordtransformer;
 
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
@@ -25,7 +26,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 /**
  * The record transformer which takes a {@link GenericRow} and transform it based on some custom rules.
  */
-public interface RecordTransformer {
+public interface RecordTransformer extends Serializable {
 
   /**
    * Transforms a record based on some custom rules.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 1d3324f..a5ca445 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.indexsegment.generator;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,7 +58,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Configuration properties used in the creation of index segments.
  */
-public class SegmentGeneratorConfig {
+public class SegmentGeneratorConfig implements Serializable {
   public enum TimeColumnType {
     EPOCH, SIMPLE_DATE
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
index 986d07d..cd0eace 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnIndexCreationInfo.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import java.util.Set;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.core.common.Constants;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 
 
-public class ColumnIndexCreationInfo {
+public class ColumnIndexCreationInfo implements Serializable {
   private final boolean createDictionary;
   private final boolean useVarLengthDictionary;
   private final boolean isAutoGenerated;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
index 4f5636e..bbfae0a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/ColumnStatistics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import java.util.Set;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 
@@ -25,7 +26,7 @@ import org.apache.pinot.core.data.partition.PartitionFunction;
 /**
  * An interface to read the column statistics from statistics collectors.
  */
-public interface ColumnStatistics {
+public interface ColumnStatistics extends Serializable {
   /**
    * @return Minimum value of the column
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
index 5bcac0d..151fe10 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreationDataSource.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
 import org.apache.pinot.spi.data.readers.RecordReader;
 
 
 /**
  * Data source used to build segments.
  */
-public interface SegmentCreationDataSource {
+public interface SegmentCreationDataSource extends Serializable {
   SegmentPreIndexStatsContainer gatherStats(StatsCollectorConfig statsCollectorConfig);
 
   RecordReader getRecordReader();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..2616dbf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.creator;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pinot.spi.data.Schema;
@@ -31,7 +32,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 /**
  * Interface for segment creators, which create an index over a set of rows and writes the resulting index to disk.
  */
-public interface SegmentCreator extends Closeable {
+public interface SegmentCreator extends Closeable, Serializable {
 
   /**
    * Initializes the segment creation.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
index d830fa6..5c608c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationDriver.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.segment.creator;
 
 import java.io.File;
+import java.io.Serializable;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
 
@@ -29,7 +30,7 @@ import org.apache.pinot.spi.data.IngestionSchemaValidator;
  * Nov 6, 2014
  */
 
-public interface SegmentIndexCreationDriver {
+public interface SegmentIndexCreationDriver extends Serializable {
   /**
    * Configures the segment generator with the given segment generator configuration, which contains the input file
    * location, format, schema and other necessary information to create an index segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
index 8ca1e73..ae57f4c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentIndexCreationInfo.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pinot.core.segment.creator;
 
-public class SegmentIndexCreationInfo {
+import java.io.Serializable;
+
+
+public class SegmentIndexCreationInfo implements Serializable {
   private int totalDocs;
 
   public int getTotalDocs() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
index e56c027..1d3ec21 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentPreIndexStatsContainer.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.core.segment.creator;
 
+import java.io.Serializable;
+
+
 /**
  * Container for per-column stats for a segment
  */
-public interface SegmentPreIndexStatsContainer {
+public interface SegmentPreIndexStatsContainer extends Serializable {
   ColumnStatistics getColumnProfileFor(String column)
       throws Exception;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
index 87d5feb..d1136f8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/memory/PinotByteBuffer.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.segment.memory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.MappedByteBuffer;
@@ -231,7 +232,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       }
     } else {
       ByteBuffer duplicate = _buffer.duplicate();
-      duplicate.position(intOffset);
+      ((Buffer) duplicate).position(intOffset);
       duplicate.get(buffer, destOffset, size);
     }
   }
@@ -243,7 +244,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     int start = (int) offset;
     int end = start + (int) size;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position(start).limit(end);
+    ((Buffer) duplicate).position(start).limit(end);
     buffer.readFrom(destOffset, duplicate);
   }
 
@@ -258,7 +259,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       }
     } else {
       ByteBuffer duplicate = _buffer.duplicate();
-      duplicate.position(intOffset);
+      ((Buffer) duplicate).position(intOffset);
       duplicate.put(buffer, srcOffset, size);
     }
   }
@@ -267,7 +268,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
   public void readFrom(long offset, ByteBuffer buffer) {
     assert offset <= Integer.MAX_VALUE;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position((int) offset);
+    ((Buffer) duplicate).position((int) offset);
     duplicate.put(buffer);
   }
 
@@ -280,7 +281,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
       ByteBuffer duplicate = _buffer.duplicate();
       int start = (int) offset;
       int end = start + (int) size;
-      duplicate.position(start).limit(end);
+      ((Buffer) duplicate).position(start).limit(end);
       randomAccessFile.getChannel().read(duplicate, srcOffset);
     }
   }
@@ -300,7 +301,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     assert start <= end;
     assert end <= Integer.MAX_VALUE;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position((int) start).limit((int) end);
+    ((Buffer) duplicate).position((int) start).limit((int) end);
     ByteBuffer buffer = duplicate.slice();
     buffer.order(byteOrder);
     return new PinotByteBuffer(buffer, false, false);
@@ -312,7 +313,7 @@ public class PinotByteBuffer extends PinotDataBuffer {
     int start = (int) offset;
     int end = start + size;
     ByteBuffer duplicate = _buffer.duplicate();
-    duplicate.position(start).limit(end);
+    ((Buffer) duplicate).position(start).limit(end);
     ByteBuffer buffer = duplicate.slice();
     buffer.order(byteOrder);
     return buffer;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
index eed203c..430ca99 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.core.segment.name;
 
 import com.google.common.base.Joiner;
+import java.io.Serializable;
 import javax.annotation.Nullable;
 
 
 /**
  * Interface for segment name generator based on the segment sequence id and time range.
  */
-public interface SegmentNameGenerator {
+public interface SegmentNameGenerator extends Serializable {
   Joiner JOINER = Joiner.on('_').skipNulls();
 
   /**
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
index 33661f6..b2c5486 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/BaseJsonConfig.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
+import java.io.Serializable;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
@@ -30,7 +31,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public abstract class BaseJsonConfig {
+public abstract class BaseJsonConfig implements Serializable {
 
   public JsonNode toJsonNode() {
     return JsonUtils.objectToJsonNode(this);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
index b04ecb7..a393ca5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/FieldSpec.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.data;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.EqualityUtils;
@@ -38,7 +39,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  * <p>- <code>VirtualColumnProvider</code>: the virtual column provider to use for this field.
  */
 @SuppressWarnings("unused")
-public abstract class FieldSpec implements Comparable<FieldSpec> {
+public abstract class FieldSpec implements Comparable<FieldSpec>, Serializable {
   private static final int DEFAULT_MAX_LENGTH = 512;
 
   // TODO: revisit to see if we allow 0-length byte array
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..bdbcae3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.spi.data;
 
+import java.io.Serializable;
+
 
 /**
  * Validator to validate the schema between Pinot schema and input raw data schema
  */
-public interface IngestionSchemaValidator {
+public interface IngestionSchemaValidator extends Serializable {
 
   void init(Schema pinotSchema, String inputFilePath);
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 723a1c1..57b686b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -28,6 +28,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -59,7 +60,7 @@ import static org.apache.pinot.spi.data.FieldSpec.DataType.STRING;
  */
 @SuppressWarnings("unused")
 @JsonIgnoreProperties(ignoreUnknown = true)
-public final class Schema {
+public final class Schema implements Serializable {
   private static final Logger LOGGER = LoggerFactory.getLogger(Schema.class);
 
   private String _schemaName;
@@ -73,7 +74,7 @@ public final class Schema {
   private List<String> _primaryKeyColumns;
 
   // Json ignored fields
-  private transient final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
+  private final Map<String, FieldSpec> _fieldSpecMap = new HashMap<>();
   private transient final List<String> _dimensionNames = new ArrayList<>();
   private transient final List<String> _metricNames = new ArrayList<>();
   private transient final List<String> _dateTimeNames = new ArrayList<>();
@@ -803,4 +804,45 @@ public final class Schema {
     }
     return outerFunction;
   }
+
+  public static void main(String[] args)
+      throws IOException {
+    String string = "{\n" + "  \"schemaName\" : \"internalTesting\",\n" + "  \"dimensionFieldSpecs\" : [ {\n"
+        + "    \"name\" : \"actionTime\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"minutesSinceEpoch\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"hoursSinceEpoch\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"weeksSinceEpochSunday\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"actorId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"companyId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"occupationId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"profileIndustryId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"regionId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"referrer\",\n" + "    \"dataType\" : \"STRING\"\n" + "  }, {\n"
+        + "    \"name\" : \"socialGestureId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"authorId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"shareId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"activityId\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"shareCreationTimestamp\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"isFollowing\",\n" + "    \"dataType\" : \"STRING\",\n"
+        + "    \"defaultNullValue\" : \"false\"\n" + "  }, {\n" + "    \"name\" : \"isConnected\",\n"
+        + "    \"dataType\" : \"STRING\",\n" + "    \"defaultNullValue\" : \"false\"\n" + "  }, {\n"
+        + "    \"name\" : \"occupationSeniority\",\n" + "    \"dataType\" : \"INT\"\n" + "  } ],\n"
+        + "  \"metricFieldSpecs\" : [ {\n" + "    \"name\" : \"viewCount\",\n" + "    \"dataType\" : \"LONG\"\n"
+        + "  }, {\n" + "    \"name\" : \"likeCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"commentCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"reshareCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  }, {\n"
+        + "    \"name\" : \"replyToCommentCount\",\n" + "    \"dataType\" : \"LONG\"\n" + "  } ],\n"
+        + "  \"timeFieldSpec\" : {\n" + "    \"incomingGranularitySpec\" : {\n"
+        + "      \"name\" : \"daysSinceEpoch\",\n" + "      \"dataType\" : \"LONG\",\n"
+        + "      \"timeType\" : \"DAYS\"\n" + "    }\n" + "  }\n" + "}";
+    Schema schema = Schema.fromString(string);
+
+    System.out.println(schema);
+
+    System.out.println(schema.getColumnNames());
+    String timeColumnName = "daysSinceEpoch";
+    DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName);
+
+    System.out.println(dateTimeFieldSpec);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
index e97f537..51e43cf 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
@@ -21,12 +21,11 @@ package org.apache.pinot.spi.data;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
 
 
 /**
@@ -47,7 +46,7 @@ import org.joda.time.format.DateTimeFormat;
  */
 @SuppressWarnings("unused")
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class TimeGranularitySpec {
+public class TimeGranularitySpec implements Serializable {
   private static final int DEFAULT_TIME_UNIT_SIZE = 1;
   private static final String DEFAULT_TIME_FORMAT = TimeFormat.EPOCH.toString();
   private static final String COLON_SEPARATOR = ":";
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5bddf02..72e5c52 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -49,7 +50,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
  *  MV: Object[] or List of Byte, Character, Short, Integer, Long, Float, Double, String
  *  We should not be using Boolean, Byte, Character and Short to keep it simple
  */
-public class GenericRow {
+public class GenericRow implements Serializable {
 
   /**
    * This key is used by a Decoder/RecordReader to handle 1 record to many records flattening.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
index 0df528b..a5386c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordExtractor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.data.readers;
 
+import java.io.Serializable;
 import java.util.Set;
 import javax.annotation.Nullable;
 
@@ -29,7 +30,7 @@ import javax.annotation.Nullable;
  * 3) Nested/Complex fields (e.g. json maps, avro maps, avro records) become Map<Object, Object>
  * @param <T> The format of the input record
  */
-public interface RecordExtractor<T> {
+public interface RecordExtractor<T> extends Serializable {
 
   /**
    * Initialize the record extractor with its config
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 15ba945..8486a9d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Set;
 import javax.annotation.Nullable;
 
@@ -30,7 +31,7 @@ import javax.annotation.Nullable;
  * Pinot segments will be generated from {@link GenericRow}s.
  * <p>NOTE: for time column, record reader should be able to read both incoming and outgoing time
  */
-public interface RecordReader extends Closeable {
+public interface RecordReader extends Closeable, Serializable {
 
   /**
    * Initializes the record reader with data file, schema and (optional) record reader config.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org