You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by su...@apache.org on 2020/09/09 00:36:22 UTC

[druid] branch master updated: Fix stringFirst/stringLast rollup during ingestion (#10332)

This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f0da3  Fix stringFirst/stringLast rollup during ingestion (#10332)
e5f0da3 is described below

commit e5f0da30ae15369f66c7e9ecc05a41c3d49eb2e6
Author: Joy Kent <30...@users.noreply.github.com>
AuthorDate: Tue Sep 8 17:36:04 2020 -0700

    Fix stringFirst/stringLast rollup during ingestion (#10332)
    
    * Add IndexMergerRollupTest
    
    This changelist adds a test to merge indexes with StringFirst/StringLast aggregator.
    
    * Fix StringFirstAggregateCombiner/StringLastAggregateCombiner
    
    The segment-level type for stringFirst/stringLast is SerializablePairLongString,
    not String. This changelist fixes it.
    
    * Fix EarliestLatestAnySqlAggregator to handle COMPLEX type
    
    This changelist allows EarliestLatestAnySqlAggregator to accept COMPLEX
    type as an operand. For its return type, we set it to VARCHAR, since
    COMPLEX column is only generated by stringFirst/stringLast during ingestion
    rollup.
    
    * Return value with smaller timestamp in StringFirstAggregatorFactory.combine function
    
    * Add integration tests for stringFirst/stringLast during ingestion
    
    * Use one EarliestLatestReturnTypeInference instance
    
    Co-authored-by: Joy Kent <jo...@automonic.ai>
---
 .../apache/druid/tests/indexer/ITIndexerTest.java  |  42 +++++++
 .../resources/indexer/wikipedia_index_queries.json |  33 ++++++
 .../indexer/wikipedia_merge_index_queries.json     |  42 +++++++
 .../indexer/wikipedia_merge_index_task.json        |  70 ++++++++++++
 ...edia_merge_reindex_druid_input_source_task.json |  63 +++++++++++
 .../indexer/wikipedia_merge_reindex_task.json      |  65 +++++++++++
 .../first/StringFirstAggregateCombiner.java        |  23 ++--
 .../last/StringLastAggregateCombiner.java          |  21 ++--
 .../first/StringFirstAggregationTest.java          |  11 +-
 .../last/StringLastAggregationTest.java            |  11 +-
 .../druid/segment/IndexMergerRollupTest.java       | 121 +++++++++++++++++++++
 .../builtin/EarliestLatestAnySqlAggregator.java    |  37 ++++++-
 12 files changed, 506 insertions(+), 33 deletions(-)

diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 95b16de..a602d4a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -45,6 +45,15 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   private static final String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_reindex_queries.json";
   private static final String REINDEX_DATASOURCE = "wikipedia_reindex_test";
 
+  private static final String MERGE_INDEX_TASK = "/indexer/wikipedia_merge_index_task.json";
+  private static final String MERGE_INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
+  private static final String MERGE_INDEX_DATASOURCE = "wikipedia_merge_index_test";
+
+  private static final String MERGE_REINDEX_TASK = "/indexer/wikipedia_merge_reindex_task.json";
+  private static final String MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE = "/indexer/wikipedia_merge_reindex_druid_input_source_task.json";
+  private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json";
+  private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test";
+
   @Test
   public void testIndexData() throws Exception
   {
@@ -110,4 +119,37 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
       );
     }
   }
+
+  @Test
+  public void testMERGEIndexData() throws Exception
+  {
+    final String reindexDatasource = MERGE_REINDEX_DATASOURCE + "-testMergeIndexData";
+    final String reindexDatasourceWithDruidInputSource = MERGE_REINDEX_DATASOURCE + "-testMergeReIndexData-druidInputSource";
+    try (
+        final Closeable ignored1 = unloader(MERGE_INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+        final Closeable ignored2 = unloader(reindexDatasource + config.getExtraDatasourceNameSuffix());
+        final Closeable ignored3 = unloader(reindexDatasourceWithDruidInputSource + config.getExtraDatasourceNameSuffix())
+    ) {
+      doIndexTest(
+          MERGE_INDEX_DATASOURCE,
+          MERGE_INDEX_TASK,
+          MERGE_INDEX_QUERIES_RESOURCE,
+          false,
+          true,
+          true
+      );
+      doReindexTest(
+          MERGE_INDEX_DATASOURCE,
+          reindexDatasource,
+          MERGE_REINDEX_TASK,
+          MERGE_REINDEX_QUERIES_RESOURCE
+      );
+      doReindexTest(
+          MERGE_INDEX_DATASOURCE,
+          reindexDatasourceWithDruidInputSource,
+          MERGE_REINDEX_TASK_WITH_DRUID_INPUT_SOURCE,
+          MERGE_INDEX_QUERIES_RESOURCE
+      );
+    }
+  }
 }
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
index bf2a70b..928effe 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json
@@ -113,5 +113,38 @@
                 "rows" : 1
             }
         } ]
+    },
+    {
+        "description": "timeseries, stringFirst/stringLast aggs, all",
+        "query":{
+            "queryType" : "timeseries",
+            "dataSource": "%%DATASOURCE%%",
+            "granularity":"day",
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ],
+            "filter":null,
+            "aggregations":[
+                {
+                    "type": "stringFirst",
+                    "name": "first_user",
+                    "fieldName": "user"
+                },
+                {
+                    "type":"stringLast",
+                    "name":"last_user",
+                    "fieldName":"user"
+                }
+            ]
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2013-08-31T00:00:00.000Z",
+                "result" : {
+                    "first_user":"nuclear",
+                    "last_user":"stringer"
+                }
+            }
+        ]
     }
 ]
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json
new file mode 100644
index 0000000..ab46749
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_queries.json
@@ -0,0 +1,42 @@
+[
+    {
+        "description": "groupby, stringFirst/stringLast rollup aggs, all",
+        "query":{
+            "queryType" : "groupBy",
+            "dataSource": "%%DATASOURCE%%",
+            "granularity":"day",
+            "dimensions":[
+                "continent"
+            ],
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ],
+            "filter":{
+                "type":"selector",
+                "dimension":"continent",
+                "value":"Asia"
+            },
+            "aggregations":[
+                {
+                    "type": "stringFirst",
+                    "name": "earliest_user",
+                    "fieldName": "first_user"
+                },
+                {
+                    "type":"stringLast",
+                    "name":"latest_user",
+                    "fieldName":"last_user"
+                }
+            ]
+        },
+        "expectedResults":[ {
+            "version" : "v1",
+            "timestamp" : "2013-08-31T00:00:00.000Z",
+            "event" : {
+                "continent":"Asia",
+                "earliest_user":"masterYi",
+                "latest_user":"stringer"
+            }
+        } ]
+    }
+]
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json
new file mode 100644
index 0000000..43264a8
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_index_task.json
@@ -0,0 +1,70 @@
+{
+    "type": "index",
+    "spec": {
+        "dataSchema": {
+            "dataSource": "%%DATASOURCE%%",
+            "metricsSpec": [
+                {
+                    "type": "count",
+                    "name": "count"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                },
+                {
+                    "type": "stringFirst",
+                    "name": "first_user",
+                    "fieldName": "user"
+                },
+                {
+                    "type": "stringLast",
+                    "name": "last_user",
+                    "fieldName": "user"
+                }
+            ],
+            "granularitySpec": {
+                "segmentGranularity": "DAY",
+                "queryGranularity": "DAY",
+                "intervals" : [ "2013-08-31/2013-09-02" ]
+            },
+            "parser": {
+                "parseSpec": {
+                    "format" : "json",
+                    "timestampSpec": {
+                        "column": "timestamp"
+                    },
+                    "dimensionsSpec": {
+                        "dimensions": [
+                            "continent"
+                        ]
+                    }
+                }
+            }
+        },
+        "ioConfig": {
+            "type": "index",
+            "firehose": {
+                "type": "local",
+                "baseDir": "/resources/data/batch_index/json",
+                "filter": "wikipedia_index_data*"
+            }
+        },
+        "tuningConfig": {
+            "type": "index",
+            "maxRowsPerSegment": 5,
+            "maxRowsInMemory": 2
+        }
+    }
+}
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json
new file mode 100644
index 0000000..9daae62
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_druid_input_source_task.json
@@ -0,0 +1,63 @@
+{
+    "type": "index",
+    "spec": {
+        "ioConfig": {
+            "type": "index",
+            "inputSource": {
+                "type": "druid",
+                "dataSource": "%%DATASOURCE%%",
+                "interval": "2013-08-31/2013-09-01"
+            }
+        },
+        "tuningConfig": {
+            "type": "index",
+            "partitionsSpec": {
+                "type": "dynamic"
+            }
+        },
+        "dataSchema": {
+            "dataSource": "%%REINDEX_DATASOURCE%%",
+            "granularitySpec": {
+                "type": "uniform",
+                "queryGranularity": "DAY",
+                "segmentGranularity": "DAY"
+            },
+            "timestampSpec": {
+                "column": "__time",
+                "format": "iso"
+            },
+            "dimensionsSpec": {
+                "dimensions": [
+                    "continent"
+                ]
+            },
+            "metricsSpec": [
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                },
+                {
+                    "type": "stringFirst",
+                    "name": "first_user",
+                    "fieldName": "first_user"
+                },
+                {
+                    "type": "stringLast",
+                    "name": "last_user",
+                    "fieldName": "last_user"
+                }
+            ]
+        }
+    }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json
new file mode 100644
index 0000000..127461d
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_merge_reindex_task.json
@@ -0,0 +1,65 @@
+{
+    "type": "index",
+    "spec": {
+        "dataSchema": {
+            "dataSource": "%%REINDEX_DATASOURCE%%",
+            "metricsSpec": [
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                },
+                {
+                    "type": "stringFirst",
+                    "name": "first_user",
+                    "fieldName": "first_user"
+                },
+                {
+                    "type": "stringLast",
+                    "name": "last_user",
+                    "fieldName": "last_user"
+                }
+            ],
+            "granularitySpec": {
+                "segmentGranularity": "DAY",
+                "queryGranularity": "DAY",
+                "intervals" : [ "2013-08-31/2013-09-01" ]
+            },
+            "parser": {
+                "parseSpec": {
+                    "format" : "json",
+                    "timestampSpec": {
+                        "column": "timestamp",
+                        "format": "iso"
+                    },
+                    "dimensionsSpec": {
+                        "dimensions": [
+                            "continent"
+                        ]
+                    }
+                }
+            }
+        },
+        "ioConfig": {
+            "type": "index",
+            "firehose": {
+                "type": "ingestSegment",
+                "dataSource": "%%DATASOURCE%%",
+                "interval": "2013-08-31/2013-09-01"
+            }
+        },
+        "tuningConfig": {
+            "type": "index"
+        }
+    }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java
index 9a86a13..4ccf92e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/first/StringFirstAggregateCombiner.java
@@ -20,41 +20,40 @@
 package org.apache.druid.query.aggregation.first;
 
 import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
 import org.apache.druid.segment.ColumnValueSelector;
 
 import javax.annotation.Nullable;
 
-public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String>
+public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<SerializablePairLongString>
 {
-  private String firstString;
-  private boolean isReset = false;
+  private SerializablePairLongString firstValue;
 
   @Override
   public void reset(ColumnValueSelector selector)
   {
-    firstString = (String) selector.getObject();
-    isReset = true;
+    firstValue = (SerializablePairLongString) selector.getObject();
   }
 
   @Override
   public void fold(ColumnValueSelector selector)
   {
-    if (!isReset) {
-      firstString = (String) selector.getObject();
-      isReset = true;
+    SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject();
+    if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(firstValue, newValue) > 0) {
+      firstValue = newValue;
     }
   }
 
   @Nullable
   @Override
-  public String getObject()
+  public SerializablePairLongString getObject()
   {
-    return firstString;
+    return firstValue;
   }
 
   @Override
-  public Class<String> classOfObject()
+  public Class<SerializablePairLongString> classOfObject()
   {
-    return String.class;
+    return SerializablePairLongString.class;
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java
index e6ea2ff..47f2119 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/last/StringLastAggregateCombiner.java
@@ -20,36 +20,41 @@
 package org.apache.druid.query.aggregation.last;
 
 import org.apache.druid.query.aggregation.ObjectAggregateCombiner;
+import org.apache.druid.query.aggregation.SerializablePairLongString;
+import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 
 import javax.annotation.Nullable;
 
-public class StringLastAggregateCombiner extends ObjectAggregateCombiner<String>
+public class StringLastAggregateCombiner extends ObjectAggregateCombiner<SerializablePairLongString>
 {
-  private String lastString;
+  private SerializablePairLongString lastValue;
 
   @Override
   public void reset(ColumnValueSelector selector)
   {
-    lastString = (String) selector.getObject();
+    lastValue = (SerializablePairLongString) selector.getObject();
   }
 
   @Override
   public void fold(ColumnValueSelector selector)
   {
-    lastString = (String) selector.getObject();
+    SerializablePairLongString newValue = (SerializablePairLongString) selector.getObject();
+    if (StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lastValue, newValue) < 0) {
+      lastValue = (SerializablePairLongString) selector.getObject();
+    }
   }
 
   @Nullable
   @Override
-  public String getObject()
+  public SerializablePairLongString getObject()
   {
-    return lastString;
+    return lastValue;
   }
 
   @Override
-  public Class<String> classOfObject()
+  public Class<SerializablePairLongString> classOfObject()
   {
-    return String.class;
+    return SerializablePairLongString.class;
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
index 039054d..854cdbf 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.aggregation.first;
 
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -62,6 +63,7 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   @Before
   public void setup()
   {
+    NullHandling.initializeForTests();
     stringFirstAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
     combiningAggFactory = stringFirstAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
@@ -175,24 +177,23 @@ public class StringFirstAggregationTest extends InitializedNullHandlingTest
   @Test
   public void testStringFirstAggregateCombiner()
   {
-    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
-    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
+    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs);
 
     AggregateCombiner stringFirstAggregateCombiner =
         combiningAggFactory.makeAggregateCombiner();
 
     stringFirstAggregateCombiner.reset(columnSelector);
 
-    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
 
     columnSelector.increment();
     stringFirstAggregateCombiner.fold(columnSelector);
 
-    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
 
     stringFirstAggregateCombiner.reset(columnSelector);
 
-    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
   }
 
   private void aggregate(
diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
index 39f9925..dbd2522 100644
--- a/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
+++ b/processing/src/test/java/org/apache/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.aggregation.last;
 
+import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.query.aggregation.AggregateCombiner;
 import org.apache.druid.query.aggregation.Aggregator;
@@ -61,6 +62,7 @@ public class StringLastAggregationTest
   @Before
   public void setup()
   {
+    NullHandling.initializeForTests();
     stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
     combiningAggFactory = stringLastAggFactory.getCombiningFactory();
     timeSelector = new TestLongColumnSelector(times);
@@ -159,23 +161,22 @@ public class StringLastAggregationTest
   @Test
   public void testStringLastAggregateCombiner()
   {
-    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
-    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
+    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(pairs);
 
     AggregateCombiner stringFirstAggregateCombiner = combiningAggFactory.makeAggregateCombiner();
 
     stringFirstAggregateCombiner.reset(columnSelector);
 
-    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[0], stringFirstAggregateCombiner.getObject());
 
     columnSelector.increment();
     stringFirstAggregateCombiner.fold(columnSelector);
 
-    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
 
     stringFirstAggregateCombiner.reset(columnSelector);
 
-    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+    Assert.assertEquals(pairs[1], stringFirstAggregateCombiner.getObject());
   }
 
   private void aggregate(
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java
new file mode 100644
index 0000000..547abcb
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.MapBasedInputRow;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import org.apache.druid.query.aggregation.last.StringLastAggregatorFactory;
+import org.apache.druid.segment.data.IncrementalIndexTest;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class IndexMergerRollupTest extends InitializedNullHandlingTest
+{
+
+  private IndexMerger indexMerger;
+  private IndexIO indexIO;
+  private IndexSpec indexSpec;
+
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp()
+  {
+    indexMerger = TestHelper
+        .getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
+    indexIO = TestHelper.getTestIndexIO();
+    indexSpec = new IndexSpec();
+  }
+
+  private void testStringFirstLastRollup(
+      AggregatorFactory[] aggregatorFactories
+  ) throws Exception
+  {
+    List<Map<String, Object>> eventsList = Arrays.asList(
+        new HashMap<String, Object>()
+        {
+          {
+            put("d", "d1");
+            put("m", "m1");
+          }
+        },
+        new HashMap<String, Object>()
+        {
+          {
+            put("d", "d1");
+            put("m", "m2");
+          }
+        }
+    );
+
+    final File tempDir = temporaryFolder.newFolder();
+
+    List<QueryableIndex> indexes = new ArrayList<>();
+    Instant time = Instant.now();
+
+    for (Map<String, Object> events : eventsList) {
+      IncrementalIndex toPersist = IncrementalIndexTest.createIndex(aggregatorFactories);
+
+      toPersist.add(new MapBasedInputRow(time.toEpochMilli(), ImmutableList.of("d"), events));
+      indexes.add(indexIO.loadIndex(indexMerger.persist(toPersist, tempDir, indexSpec, null)));
+    }
+
+    File indexFile = indexMerger
+        .mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null);
+    try (QueryableIndex mergedIndex = indexIO.loadIndex(indexFile)) {
+      Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows());
+    }
+  }
+
+  @Test
+  public void testStringFirstRollup() throws Exception
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new StringFirstAggregatorFactory("m", "m", 1024)
+    };
+    testStringFirstLastRollup(aggregatorFactories);
+  }
+
+  @Test
+  public void testStringLastRollup() throws Exception
+  {
+    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
+        new StringLastAggregatorFactory("m", "m", 1024)
+    };
+    testStringFirstLastRollup(aggregatorFactories);
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
index 800ee7d..8ec917e 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/aggregation/builtin/EarliestLatestAnySqlAggregator.java
@@ -28,9 +28,12 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
 import org.apache.calcite.util.Optionality;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -85,6 +88,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
           case DOUBLE:
             return new DoubleFirstAggregatorFactory(name, fieldName);
           case STRING:
+          case COMPLEX:
             return new StringFirstAggregatorFactory(name, fieldName, maxStringBytes);
           default:
             throw new ISE("Cannot build EARLIEST aggregatorFactory for type[%s]", type);
@@ -104,6 +108,7 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
           case DOUBLE:
             return new DoubleLastAggregatorFactory(name, fieldName);
           case STRING:
+          case COMPLEX:
             return new StringLastAggregatorFactory(name, fieldName, maxStringBytes);
           default:
             throw new ISE("Cannot build LATEST aggregatorFactory for type[%s]", type);
@@ -219,22 +224,48 @@ public class EarliestLatestAnySqlAggregator implements SqlAggregator
     );
   }
 
+  static class EarliestLatestReturnTypeInference implements SqlReturnTypeInference
+  {
+    private final int ordinal;
+
+    public EarliestLatestReturnTypeInference(int ordinal)
+    {
+      this.ordinal = ordinal;
+    }
+
+    @Override
+    public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
+    {
+      RelDataType type = sqlOperatorBinding.getOperandType(this.ordinal);
+      // For non-number and non-string type, which is COMPLEX type, we set the return type to VARCHAR.
+      if (!SqlTypeUtil.isNumeric(type) &&
+          !SqlTypeUtil.isString(type)) {
+        return sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
+      } else {
+        return type;
+      }
+    }
+  }
+
   private static class EarliestLatestSqlAggFunction extends SqlAggFunction
   {
+    private static final EarliestLatestReturnTypeInference EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE =
+        new EarliestLatestReturnTypeInference(0);
+
     EarliestLatestSqlAggFunction(AggregatorType aggregatorType)
     {
       super(
           aggregatorType.name(),
           null,
           SqlKind.OTHER_FUNCTION,
-          ReturnTypes.ARG0,
+          EARLIEST_LATEST_ARG0_RETURN_TYPE_INFERENCE,
           InferTypes.RETURN_TYPE,
           OperandTypes.or(
               OperandTypes.NUMERIC,
               OperandTypes.BOOLEAN,
               OperandTypes.sequence(
                   "'" + aggregatorType.name() + "(expr, maxBytesPerString)'\n",
-                  OperandTypes.STRING,
+                  OperandTypes.ANY,
                   OperandTypes.and(OperandTypes.NUMERIC, OperandTypes.LITERAL)
               )
           ),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org