You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2018/11/14 02:39:42 UTC

[incubator-pinot] branch fix_time_conversion created (now e5389c1)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch fix_time_conversion
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at e5389c1  Fix the bug where realtime time conversion is skipped when incoming and outgoing time name are the same

This branch includes the following new commits:

     new e5389c1  Fix the bug where realtime time conversion is skipped when incoming and outgoing time name are the same

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Fix the bug where realtime time conversion is skipped when incoming and outgoing time name are the same

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch fix_time_conversion
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e5389c15ef27d66cd058e8b65e0a0931a204cc0b
Author: Xiaotian (Jackie) Jiang <xa...@linkedin.com>
AuthorDate: Tue Nov 13 17:55:51 2018 -0800

    Fix the bug where realtime time conversion is skipped when incoming and outgoing time name are the same
    
    1. Always perform time transformation for REALTIME segment consumption
    2. Always not perform time transformation for OFFLINE segment generation
      - The reason to disable time trasformation for OFFLINE is that, with the same incoming and outgoing time name, there is no way to determine whether the time conversion already happens. OFFLINE segment might be pre-aggregated and the time might have already been converted. If we need time transformation for OFFLINE segment generation in the future, we can add an extra flag for that.
---
 .../realtime/HLRealtimeSegmentDataManager.java     |  4 +--
 .../realtime/LLRealtimeSegmentDataManager.java     |  2 +-
 .../recordtransformer/CompoundTransformer.java     | 24 +++++++++++--
 .../data/recordtransformer/TimeTransformer.java    |  6 +---
 .../RecordReaderSegmentCreationDataSource.java     |  3 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  2 +-
 .../recordtransformer/RecordTransformerTest.java   | 42 ++++++++++++++++++++--
 7 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 25f3858..439a451 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -102,8 +102,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     super();
     _segmentVersion = indexLoadingConfig.getSegmentVersion();
     this.schema = schema;
-    _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
-    this.serverMetrics =serverMetrics;
+    _recordTransformer = CompoundTransformer.getRealtimeTransformer(schema);
+    this.serverMetrics = serverMetrics;
     this.segmentName = realtimeSegmentZKMetadata.getSegmentName();
     this.tableName = tableConfig.getTableName();
     this.timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index ec19064..202595f 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1061,7 +1061,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();
 
     // Create record transformer
-    _recordTransformer = CompoundTransformer.getDefaultTransformer(schema);
+    _recordTransformer = CompoundTransformer.getRealtimeTransformer(schema);
     makeStreamConsumer("Starting");
     makeStreamMetadataProvider("Starting");
 
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java
index 1008a45..5e9b83e 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/CompoundTransformer.java
@@ -30,12 +30,30 @@ public class CompoundTransformer implements RecordTransformer {
   private final List<RecordTransformer> _transformers;
 
   /**
-   * Returns a record transformer that performs time transform, expressions transform and data type transform.
+   * Returns a record transformer for OFFLINE segment generation that performs expressions transform and data type
+   * transform.
+   * <p>NOTE: NO TIME TRANSFORMATION
+   * <p>NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS
+   * <ul>
+   *   <li>
+   *     We put {@link SanitationTransformer} after {@link DataTypeTransformer} so that before sanitation, all values
+   *     follow the data types defined in the {@link Schema}.
+   *   </li>
+   * </ul>
+   */
+  public static CompoundTransformer getOfflineTransformer(Schema schema) {
+    return new CompoundTransformer(Arrays.asList(new ExpressionTransformer(schema), new DataTypeTransformer(schema),
+        new SanitationTransformer(schema)));
+  }
+
+  /**
+   * Returns a record transformer for REALTIME segment consumption that performs time transform, expressions transform
+   * and data type transform.
    * <p>NOTE: DO NOT CHANGE THE ORDER OF THE RECORD TRANSFORMERS
    * <ul>
    *   <li>
    *     We put {@link ExpressionTransformer} after {@link TimeTransformer} so that expression can work on outgoing time
-   *     column
+   *     column.
    *   </li>
    *   <li>
    *     We put {@link SanitationTransformer} after {@link DataTypeTransformer} so that before sanitation, all values
@@ -43,7 +61,7 @@ public class CompoundTransformer implements RecordTransformer {
    *   </li>
    * </ul>
    */
-  public static CompoundTransformer getDefaultTransformer(Schema schema) {
+  public static CompoundTransformer getRealtimeTransformer(Schema schema) {
     return new CompoundTransformer(
         Arrays.asList(new TimeTransformer(schema), new ExpressionTransformer(schema), new DataTypeTransformer(schema),
             new SanitationTransformer(schema)));
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
index 2ee8e00..8d403d3 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
@@ -55,11 +55,7 @@ public class TimeTransformer implements RecordTransformer {
     if (_timeConverter == null) {
       return record;
     }
-    // Skip transformation if outgoing value already exist
-    // NOTE: outgoing value might already exist for OFFLINE data
-    if (record.getValue(_outgoingTimeColumn) == null) {
-      record.putField(_outgoingTimeColumn, _timeConverter.convert(record.getValue(_incomingTimeColumn)));
-    }
+    record.putField(_outgoingTimeColumn, _timeConverter.convert(record.getValue(_incomingTimeColumn)));
     return record;
   }
 }
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
index c5778b1..6d0f976 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -42,8 +42,7 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
   @Override
   public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) {
     try {
-      RecordTransformer recordTransformer =
-          CompoundTransformer.getDefaultTransformer(statsCollectorConfig.getSchema());
+      RecordTransformer recordTransformer = CompoundTransformer.getOfflineTransformer(statsCollectorConfig.getSchema());
 
       SegmentPreIndexStatsCollector collector = new SegmentPreIndexStatsCollectorImpl(statsCollectorConfig);
       collector.init();
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 62f96dd..e104a1c 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -107,7 +107,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource) {
-    init(config, dataSource, CompoundTransformer.getDefaultTransformer(dataSource.getRecordReader().getSchema()));
+    init(config, dataSource, CompoundTransformer.getOfflineTransformer(dataSource.getRecordReader().getSchema()));
   }
 
   public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
index fb5f0fc..0590aae 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
@@ -86,6 +86,20 @@ public class RecordTransformerTest {
   }
 
   @Test
+  public void testTimeTransformerWithSameIncomingOutgoingColumnName() {
+    Schema schema = new Schema.SchemaBuilder().addTime("time", 6, TimeUnit.HOURS, FieldSpec.DataType.INT, "time", 1,
+        TimeUnit.MILLISECONDS, FieldSpec.DataType.LONG).build();
+    RecordTransformer transformer = new TimeTransformer(schema);
+    GenericRow record = new GenericRow();
+    record.putField("time", 123);
+
+    // With the same incoming and outgoing column name, transform multiple times will return different results
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("time"), 123 * 6 * 3600 * 1000L);
+  }
+
+  @Test
   public void testDataTypeTransformer() {
     RecordTransformer transformer = new DataTypeTransformer(SCHEMA);
     GenericRow record = getRecord();
@@ -124,8 +138,32 @@ public class RecordTransformerTest {
   }
 
   @Test
-  public void testDefaultTransformer() {
-    RecordTransformer transformer = CompoundTransformer.getDefaultTransformer(SCHEMA);
+  public void testOfflineTransformer() {
+    RecordTransformer transformer = CompoundTransformer.getOfflineTransformer(SCHEMA);
+    GenericRow record = getRecord();
+    for (int i = 0; i < NUM_ROUNDS; i++) {
+      record = transformer.transform(record);
+      assertNotNull(record);
+      assertEquals(record.getValue("svInt"), 123);
+      assertEquals(record.getValue("svLong"), 123L);
+      assertEquals(record.getValue("svFloat"), 123f);
+      assertEquals(record.getValue("svDouble"), 123d);
+      assertEquals(record.getValue("svBytes"), new byte[]{123, 123});
+      assertEquals(record.getValue("mvInt"), new Object[]{123});
+      assertEquals(record.getValue("mvLong"), new Object[]{123L});
+      assertEquals(record.getValue("mvFloat"), new Object[]{123f});
+      assertEquals(record.getValue("mvDouble"), new Object[]{123d});
+      assertEquals(record.getValue("svStringWithNullCharacters"), "1");
+      assertEquals(record.getValue("svStringWithLengthLimit"), "12");
+      assertEquals(record.getValue("incoming"), "123");
+      // No time conversion for OFFLINE transformer
+      assertEquals(record.getValue("outgoing"), Long.MIN_VALUE);
+    }
+  }
+
+  @Test
+  public void testRealtimeTransformer() {
+    RecordTransformer transformer = CompoundTransformer.getRealtimeTransformer(SCHEMA);
     GenericRow record = getRecord();
     for (int i = 0; i < NUM_ROUNDS; i++) {
       record = transformer.transform(record);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org