You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/01 17:52:56 UTC

[GitHub] jihoonson closed pull request #5789: Add stringLast and stringFirst aggregators extension

jihoonson closed pull request #5789: Add stringLast and stringFirst aggregators extension
URL: https://github.com/apache/incubator-druid/pull/5789
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/content/querying/aggregations.md b/docs/content/querying/aggregations.md
index b0ce5cc24c9..3f6b5e7c096 100644
--- a/docs/content/querying/aggregations.md
+++ b/docs/content/querying/aggregations.md
@@ -102,7 +102,7 @@ Computes and stores the sum of values as 32-bit floating point value. Similar to
 
 ### First / Last aggregator
 
-First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
+(Double/Float/Long) First and Last aggregator cannot be used in ingestion spec, and should only be specified as part of queries.
 
 Note that queries with first/last aggregators on a segment created with rollup enabled will return the rolled up value, and not the last value within the raw ingested data.
 
@@ -178,6 +178,36 @@ Note that queries with first/last aggregators on a segment created with rollup e
 }
 ```
 
+#### `stringFirst` aggregator
+
+`stringFirst` computes the metric value with the minimum timestamp or `null` if no row exist
+
+```json
+{
+  "type" : "stringFirst",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "maxStringBytes" : <integer> # (optional, defaults to 1024),
+  "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
+
+
+#### `stringLast` aggregator
+
+`stringLast` computes the metric value with the maximum timestamp or `null` if no row exist
+
+```json
+{
+  "type" : "stringLast",
+  "name" : <output_name>,
+  "fieldName" : <metric_name>,
+  "maxStringBytes" : <integer> # (optional, defaults to 1024),
+  "filterNullValues" : <boolean> # (optional, defaults to false)
+}
+```
+
 ### JavaScript aggregator
 
 Computes an arbitrary JavaScript function over a set of columns (both metrics and dimensions are allowed). Your
diff --git a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
index 94deda097a2..d3cc9a7365b 100644
--- a/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
+++ b/processing/src/main/java/io/druid/jackson/AggregatorsModule.java
@@ -38,10 +38,13 @@
 import io.druid.query.aggregation.LongMinAggregatorFactory;
 import io.druid.query.aggregation.LongSumAggregatorFactory;
 import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.SerializablePairLongStringSerde;
 import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
 import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
 import io.druid.query.aggregation.first.FloatFirstAggregatorFactory;
 import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.aggregation.first.StringFirstFoldingAggregatorFactory;
 import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
 import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
@@ -49,6 +52,8 @@
 import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
 import io.druid.query.aggregation.last.FloatLastAggregatorFactory;
 import io.druid.query.aggregation.last.LongLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastAggregatorFactory;
+import io.druid.query.aggregation.last.StringLastFoldingAggregatorFactory;
 import io.druid.query.aggregation.post.ArithmeticPostAggregator;
 import io.druid.query.aggregation.post.ConstantPostAggregator;
 import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
@@ -74,7 +79,14 @@ public AggregatorsModule()
     }
 
     if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
-      ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
+      ComplexMetrics.registerSerde(
+          "preComputedHyperUnique",
+          new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault())
+      );
+    }
+
+    if (ComplexMetrics.getSerdeForType("serializablePairLongString") == null) {
+      ComplexMetrics.registerSerde("serializablePairLongString", new SerializablePairLongStringSerde());
     }
 
     setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
@@ -101,9 +113,13 @@ public AggregatorsModule()
       @JsonSubTypes.Type(name = "longFirst", value = LongFirstAggregatorFactory.class),
       @JsonSubTypes.Type(name = "doubleFirst", value = DoubleFirstAggregatorFactory.class),
       @JsonSubTypes.Type(name = "floatFirst", value = FloatFirstAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringFirst", value = StringFirstAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringFirstFold", value = StringFirstFoldingAggregatorFactory.class),
       @JsonSubTypes.Type(name = "longLast", value = LongLastAggregatorFactory.class),
       @JsonSubTypes.Type(name = "doubleLast", value = DoubleLastAggregatorFactory.class),
-      @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class)
+      @JsonSubTypes.Type(name = "floatLast", value = FloatLastAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringLast", value = StringLastAggregatorFactory.class),
+      @JsonSubTypes.Type(name = "stringLastFold", value = StringLastFoldingAggregatorFactory.class)
   })
   public interface AggregatorFactoryMixin
   {
diff --git a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
index 610e6ba6636..eedd09580de 100644
--- a/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/io/druid/query/aggregation/AggregatorUtil.java
@@ -94,6 +94,10 @@
   public static final byte ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID = 0x29;
   public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID = 0x2A;
 
+  // StringFirst, StringLast aggregator
+  public static final byte STRING_FIRST_CACHE_TYPE_ID = 0x2B;
+  public static final byte STRING_LAST_CACHE_TYPE_ID = 0x2C;
+
   /**
    * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
    *
diff --git a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
new file mode 100644
index 00000000000..91f9b2622a7
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongString.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.collections.SerializablePair;
+
+public class SerializablePairLongString extends SerializablePair<Long, String>
+{
+  @JsonCreator
+  public SerializablePairLongString(@JsonProperty("lhs") Long lhs, @JsonProperty("rhs") String rhs)
+  {
+    super(lhs, rhs);
+  }
+}
+
+
diff --git a/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
new file mode 100644
index 00000000000..ca245fa1339
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/SerializablePairLongStringSerde.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation;
+
+import io.druid.data.input.InputRow;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.segment.GenericColumnSerializer;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import io.druid.segment.writeout.SegmentWriteOutMedium;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * The SerializablePairLongStringSerde serializes a Long-String pair (SerializablePairLongString).
+ * The serialization structure is: Long:Integer:String
+ * <p>
+ * The class is used on first/last String aggregators to store the time and the first/last string.
+ * Long:Integer:String -> Timestamp:StringSize:StringData
+ */
+public class SerializablePairLongStringSerde extends ComplexMetricSerde
+{
+
+  private static final String TYPE_NAME = "serializablePairLongString";
+
+  @Override
+  public String getTypeName()
+  {
+    return TYPE_NAME;
+  }
+
+  @Override
+  public ComplexMetricExtractor getExtractor()
+  {
+    return new ComplexMetricExtractor()
+    {
+      @Override
+      public Class<SerializablePairLongString> extractedClass()
+      {
+        return SerializablePairLongString.class;
+      }
+
+      @Override
+      public Object extractValue(InputRow inputRow, String metricName)
+      {
+        return inputRow.getRaw(metricName);
+      }
+    };
+  }
+
+  @Override
+  public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
+  {
+    final GenericIndexed column = GenericIndexed.read(buffer, getObjectStrategy(), columnBuilder.getFileMapper());
+    columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));
+  }
+
+  @Override
+  public ObjectStrategy getObjectStrategy()
+  {
+    return new ObjectStrategy<SerializablePairLongString>()
+    {
+      @Override
+      public int compare(@Nullable SerializablePairLongString o1, @Nullable SerializablePairLongString o2)
+      {
+        return StringFirstAggregatorFactory.VALUE_COMPARATOR.compare(o1, o2);
+      }
+
+      @Override
+      public Class<? extends SerializablePairLongString> getClazz()
+      {
+        return SerializablePairLongString.class;
+      }
+
+      @Override
+      public SerializablePairLongString fromByteBuffer(ByteBuffer buffer, int numBytes)
+      {
+        final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
+
+        long lhs = readOnlyBuffer.getLong();
+        int stringSize = readOnlyBuffer.getInt();
+
+        String lastString = null;
+        if (stringSize > 0) {
+          byte[] stringBytes = new byte[stringSize];
+          readOnlyBuffer.get(stringBytes, 0, stringSize);
+          lastString = StringUtils.fromUtf8(stringBytes);
+        }
+
+        return new SerializablePairLongString(lhs, lastString);
+      }
+
+      @Override
+      public byte[] toBytes(SerializablePairLongString val)
+      {
+        String rhsString = val.rhs;
+        ByteBuffer bbuf;
+
+        if (rhsString != null) {
+          byte[] rhsBytes = StringUtils.toUtf8(rhsString);
+          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES + rhsBytes.length);
+          bbuf.putLong(val.lhs);
+          bbuf.putInt(Long.BYTES, rhsBytes.length);
+          bbuf.position(Long.BYTES + Integer.BYTES);
+          bbuf.put(rhsBytes);
+        } else {
+          bbuf = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+          bbuf.putLong(val.lhs);
+          bbuf.putInt(Long.BYTES, 0);
+        }
+
+        return bbuf.array();
+      }
+    };
+  }
+
+  @Override
+  public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
+  {
+    return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
index cd646a7f872..7e4b4235879 100644
--- a/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/first/LongFirstAggregatorFactory.java
@@ -23,8 +23,8 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.Longs;
-import io.druid.java.util.common.StringUtils;
 import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
new file mode 100644
index 00000000000..20487f65900
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregateCombiner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringFirstAggregateCombiner extends ObjectAggregateCombiner<String>
+{
+  private String firstString;
+  private boolean isReset = false;
+
+  @Override
+  public void reset(ColumnValueSelector selector)
+  {
+    firstString = (String) selector.getObject();
+    isReset = true;
+  }
+
+  @Override
+  public void fold(ColumnValueSelector selector)
+  {
+    if (!isReset) {
+      firstString = (String) selector.getObject();
+      isReset = true;
+    }
+  }
+
+  @Nullable
+  @Override
+  public String getObject()
+  {
+    return firstString;
+  }
+
+  @Override
+  public Class<String> classOfObject()
+  {
+    return String.class;
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
new file mode 100644
index 00000000000..5710a610728
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringFirstAggregator implements Aggregator
+{
+
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseLongColumnValueSelector timeSelector;
+  private final int maxStringBytes;
+
+  protected long firstTime;
+  protected String firstValue;
+
+  public StringFirstAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.valueSelector = valueSelector;
+    this.timeSelector = timeSelector;
+    this.maxStringBytes = maxStringBytes;
+
+    firstTime = Long.MAX_VALUE;
+    firstValue = null;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    long time = timeSelector.getLong();
+    if (time < firstTime) {
+      firstTime = time;
+      Object value = valueSelector.getObject();
+
+      if (value != null) {
+        if (value instanceof String) {
+          firstValue = (String) value;
+        } else if (value instanceof SerializablePairLongString) {
+          firstValue = ((SerializablePairLongString) value).rhs;
+        } else {
+          throw new ISE(
+              "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
+              value.getClass().getCanonicalName()
+          );
+        }
+
+        if (firstValue != null && firstValue.length() > maxStringBytes) {
+          firstValue = firstValue.substring(0, maxStringBytes);
+        }
+      } else {
+        firstValue = null;
+      }
+    }
+  }
+
+  @Override
+  public Object get()
+  {
+    return new SerializablePairLongString(firstTime, firstValue);
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
new file mode 100644
index 00000000000..187e8915481
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstAggregatorFactory.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import io.druid.query.aggregation.AggregateCombiner;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.cache.CacheKeyBuilder;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+@JsonTypeName("stringFirst")
+public class StringFirstAggregatorFactory extends AggregatorFactory
+{
+  public static final int DEFAULT_MAX_STRING_SIZE = 1024;
+
+  public static final Comparator TIME_COMPARATOR = (o1, o2) -> Longs.compare(
+      ((SerializablePairLongString) o1).lhs,
+      ((SerializablePairLongString) o2).lhs
+  );
+
+  public static final Comparator<SerializablePairLongString> VALUE_COMPARATOR = (o1, o2) -> {
+    int comparation;
+
+    // First we check if the objects are null
+    if (o1 == null && o2 == null) {
+      comparation = 0;
+    } else if (o1 == null) {
+      comparation = -1;
+    } else if (o2 == null) {
+      comparation = 1;
+    } else {
+
+      // If the objects are not null, we will try to compare using timestamp
+      comparation = o1.lhs.compareTo(o2.lhs);
+
+      // If both timestamp are the same, we try to compare the Strings
+      if (comparation == 0) {
+
+        // First we check if the strings are null
+        if (o1.rhs == null && o2.rhs == null) {
+          comparation = 0;
+        } else if (o1.rhs == null) {
+          comparation = -1;
+        } else if (o2.rhs == null) {
+          comparation = 1;
+        } else {
+
+          // If the strings are not null, we will compare them
+          // Note: This comparation maybe doesn't make sense to first/last aggregators
+          comparation = o1.rhs.compareTo(o2.rhs);
+        }
+      }
+    }
+
+    return comparation;
+  };
+
+  private final String fieldName;
+  private final String name;
+  protected final int maxStringBytes;
+
+  @JsonCreator
+  public StringFirstAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+    this.name = name;
+    this.fieldName = fieldName;
+    this.maxStringBytes = maxStringBytes == null ? DEFAULT_MAX_STRING_SIZE : maxStringBytes;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    return new StringFirstAggregator(
+        metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
+    );
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    return new StringFirstBufferAggregator(
+        metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
+    );
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    return VALUE_COMPARATOR;
+  }
+
+  @Override
+  public Object combine(Object lhs, Object rhs)
+  {
+    return TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
+  }
+
+  @Override
+  public AggregateCombiner makeAggregateCombiner()
+  {
+    return new StringFirstAggregateCombiner();
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new StringFirstFoldingAggregatorFactory(name, name, maxStringBytes);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(new StringFirstAggregatorFactory(fieldName, fieldName, maxStringBytes));
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    Map map = (Map) object;
+    return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs")));
+  }
+
+  @Override
+  public Object finalizeComputation(Object object)
+  {
+    return ((SerializablePairLongString) object).rhs;
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @JsonProperty
+  public Integer getMaxStringBytes()
+  {
+    return maxStringBytes;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return new CacheKeyBuilder(AggregatorUtil.STRING_FIRST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendInt(maxStringBytes)
+        .build();
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return "serializablePairLongString";
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return Long.BYTES + Integer.BYTES + maxStringBytes;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    StringFirstAggregatorFactory that = (StringFirstAggregatorFactory) o;
+
+    return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "StringFirstAggregatorFactory{" +
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", maxStringBytes=" + maxStringBytes + '\'' +
+           '}';
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
new file mode 100644
index 00000000000..c71cfbfc2de
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstBufferAggregator implements BufferAggregator
+{
+  private final BaseLongColumnValueSelector timeSelector;
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final int maxStringBytes;
+
+  public StringFirstBufferAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.timeSelector = timeSelector;
+    this.valueSelector = valueSelector;
+    this.maxStringBytes = maxStringBytes;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.putLong(position, Long.MAX_VALUE);
+    buf.putInt(position + Long.BYTES, 0);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Object value = valueSelector.getObject();
+
+    long time = timeSelector.getLong();
+    String firstString = null;
+
+    if (value != null) {
+      if (value instanceof SerializablePairLongString) {
+        SerializablePairLongString serializablePair = (SerializablePairLongString) value;
+        time = serializablePair.lhs;
+        firstString = serializablePair.rhs;
+      } else if (value instanceof String) {
+        firstString = (String) value;
+      } else {
+        throw new ISE(
+            "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
+            value.getClass().getCanonicalName()
+        );
+      }
+    }
+
+    long lastTime = mutationBuffer.getLong(position);
+
+    if (time < lastTime) {
+      if (firstString != null) {
+        if (firstString.length() > maxStringBytes) {
+          firstString = firstString.substring(0, maxStringBytes);
+        }
+
+        byte[] valueBytes = StringUtils.toUtf8(firstString);
+
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+        mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+        mutationBuffer.put(valueBytes);
+      } else {
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, 0);
+      }
+    }
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Long timeValue = mutationBuffer.getLong(position);
+    int stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+    SerializablePairLongString serializablePair;
+
+    if (stringSizeBytes > 0) {
+      byte[] valueBytes = new byte[stringSizeBytes];
+      mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+      mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+      serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
+    } else {
+      serializablePair = new SerializablePairLongString(timeValue, null);
+    }
+
+    return serializablePair;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("timeSelector", timeSelector);
+    inspector.visit("valueSelector", valueSelector);
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
new file mode 100644
index 00000000000..b268bafa5ae
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/first/StringFirstFoldingAggregatorFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringFirstFold")
+public class StringFirstFoldingAggregatorFactory extends StringFirstAggregatorFactory
+{
+  public StringFirstFoldingAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    super(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
+    return new StringFirstAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate()
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
+        if (pair != null && pair.lhs < firstTime) {
+          firstTime = pair.lhs;
+          firstValue = pair.rhs;
+        }
+      }
+    };
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
+    return new StringFirstBufferAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate(ByteBuffer buf, int position)
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
+
+        if (pair != null && pair.lhs != null) {
+          ByteBuffer mutationBuffer = buf.duplicate();
+          mutationBuffer.position(position);
+
+          long lastTime = mutationBuffer.getLong(position);
+
+          if (pair.lhs < lastTime) {
+            mutationBuffer.putLong(position, pair.lhs);
+
+            if (pair.rhs != null) {
+              byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+              mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+              mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+              mutationBuffer.put(valueBytes);
+            } else {
+              mutationBuffer.putInt(position + Long.BYTES, 0);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("selector", selector);
+      }
+    };
+  }
+
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
index 91b43dde6b1..d2e34c43650 100644
--- a/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
+++ b/processing/src/main/java/io/druid/query/aggregation/last/LongLastAggregatorFactory.java
@@ -22,8 +22,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
-import io.druid.java.util.common.StringUtils;
 import io.druid.collections.SerializablePair;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.UOE;
 import io.druid.query.aggregation.AggregateCombiner;
 import io.druid.query.aggregation.Aggregator;
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
new file mode 100644
index 00000000000..6625f084e79
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregateCombiner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.ObjectAggregateCombiner;
+import io.druid.segment.ColumnValueSelector;
+
+import javax.annotation.Nullable;
+
+public class StringLastAggregateCombiner extends ObjectAggregateCombiner<String>
+{
+  private String lastString;
+
+  @Override
+  public void reset(ColumnValueSelector selector)
+  {
+    lastString = (String) selector.getObject();
+  }
+
+  @Override
+  public void fold(ColumnValueSelector selector)
+  {
+    lastString = (String) selector.getObject();
+  }
+
+  @Nullable
+  @Override
+  public String getObject()
+  {
+    return lastString;
+  }
+
+  @Override
+  public Class<String> classOfObject()
+  {
+    return String.class;
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
new file mode 100644
index 00000000000..85cd0dddb3e
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+public class StringLastAggregator implements Aggregator
+{
+
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final BaseLongColumnValueSelector timeSelector;
+  private final int maxStringBytes;
+
+  protected long lastTime;
+  protected String lastValue;
+
+  public StringLastAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.valueSelector = valueSelector;
+    this.timeSelector = timeSelector;
+    this.maxStringBytes = maxStringBytes;
+
+    lastTime = Long.MIN_VALUE;
+    lastValue = null;
+  }
+
+  @Override
+  public void aggregate()
+  {
+    long time = timeSelector.getLong();
+    if (time >= lastTime) {
+      lastTime = time;
+      Object value = valueSelector.getObject();
+
+      if (value != null) {
+        if (value instanceof String) {
+          lastValue = (String) value;
+        } else if (value instanceof SerializablePairLongString) {
+          lastValue = ((SerializablePairLongString) value).rhs;
+        } else {
+          throw new ISE(
+              "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
+              value.getClass().getCanonicalName()
+          );
+        }
+
+        if (lastValue != null && lastValue.length() > maxStringBytes) {
+          lastValue = lastValue.substring(0, maxStringBytes);
+        }
+      } else {
+        lastValue = null;
+      }
+    }
+  }
+
+  @Override
+  public Object get()
+  {
+    return new SerializablePairLongString(lastTime, lastValue);
+  }
+
+  @Override
+  public float getFloat()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
+  }
+
+  @Override
+  public double getDouble()
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
new file mode 100644
index 00000000000..cb4f3636612
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastAggregatorFactory.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import io.druid.query.aggregation.AggregateCombiner;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.first.StringFirstAggregatorFactory;
+import io.druid.query.cache.CacheKeyBuilder;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+
+@JsonTypeName("stringLast")
+public class StringLastAggregatorFactory extends AggregatorFactory
+{
+  private final String fieldName;
+  private final String name;
+  protected final int maxStringBytes;
+
+  @JsonCreator
+  public StringLastAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+    Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+    this.name = name;
+    this.fieldName = fieldName;
+    this.maxStringBytes = maxStringBytes == null
+                          ? StringFirstAggregatorFactory.DEFAULT_MAX_STRING_SIZE
+                          : maxStringBytes;
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    return new StringLastAggregator(
+        metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
+    );
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    return new StringLastBufferAggregator(
+        metricFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME),
+        metricFactory.makeColumnValueSelector(fieldName),
+        maxStringBytes
+    );
+  }
+
+  @Override
+  public Comparator getComparator()
+  {
+    return StringFirstAggregatorFactory.VALUE_COMPARATOR;
+  }
+
+  @Override
+  public Object combine(Object lhs, Object rhs)
+  {
+    return StringFirstAggregatorFactory.TIME_COMPARATOR.compare(lhs, rhs) > 0 ? lhs : rhs;
+  }
+
+  @Override
+  public AggregateCombiner makeAggregateCombiner()
+  {
+    return new StringLastAggregateCombiner();
+  }
+
+  @Override
+  public AggregatorFactory getCombiningFactory()
+  {
+    return new StringLastFoldingAggregatorFactory(name, name, maxStringBytes);
+  }
+
+  @Override
+  public List<AggregatorFactory> getRequiredColumns()
+  {
+    return Collections.singletonList(new StringLastAggregatorFactory(fieldName, fieldName, maxStringBytes));
+  }
+
+  @Override
+  public Object deserialize(Object object)
+  {
+    Map map = (Map) object;
+    return new SerializablePairLongString(((Number) map.get("lhs")).longValue(), ((String) map.get("rhs")));
+  }
+
+  @Override
+  public Object finalizeComputation(Object object)
+  {
+    return ((SerializablePairLongString) object).rhs;
+  }
+
+  @Override
+  @JsonProperty
+  public String getName()
+  {
+    return name;
+  }
+
+  @JsonProperty
+  public String getFieldName()
+  {
+    return fieldName;
+  }
+
+  @JsonProperty
+  public Integer getMaxStringBytes()
+  {
+    return maxStringBytes;
+  }
+
+  @Override
+  public List<String> requiredFields()
+  {
+    return Arrays.asList(Column.TIME_COLUMN_NAME, fieldName);
+  }
+
+  @Override
+  public byte[] getCacheKey()
+  {
+    return new CacheKeyBuilder(AggregatorUtil.STRING_LAST_CACHE_TYPE_ID)
+        .appendString(fieldName)
+        .appendInt(maxStringBytes)
+        .build();
+  }
+
+  @Override
+  public String getTypeName()
+  {
+    return "serializablePairLongString";
+  }
+
+  @Override
+  public int getMaxIntermediateSize()
+  {
+    return Long.BYTES + Integer.BYTES + maxStringBytes;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    StringLastAggregatorFactory that = (StringLastAggregatorFactory) o;
+
+    return fieldName.equals(that.fieldName) && name.equals(that.name) && maxStringBytes == that.maxStringBytes;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "StringFirstAggregatorFactory{" +
+           "name='" + name + '\'' +
+           ", fieldName='" + fieldName + '\'' +
+           ", maxStringBytes=" + maxStringBytes + '\'' +
+           '}';
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
new file mode 100644
index 00000000000..12c99483a61
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastBufferAggregator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.ISE;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseLongColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+
+public class StringLastBufferAggregator implements BufferAggregator
+{
+  private final BaseLongColumnValueSelector timeSelector;
+  private final BaseObjectColumnValueSelector valueSelector;
+  private final int maxStringBytes;
+
+  public StringLastBufferAggregator(
+      BaseLongColumnValueSelector timeSelector,
+      BaseObjectColumnValueSelector valueSelector,
+      int maxStringBytes
+  )
+  {
+    this.timeSelector = timeSelector;
+    this.valueSelector = valueSelector;
+    this.maxStringBytes = maxStringBytes;
+  }
+
+  @Override
+  public void init(ByteBuffer buf, int position)
+  {
+    buf.putLong(position, Long.MIN_VALUE);
+    buf.putInt(position + Long.BYTES, 0);
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Object value = valueSelector.getObject();
+
+    long time = timeSelector.getLong();
+    String lastString = null;
+
+    if (value != null) {
+      if (value instanceof SerializablePairLongString) {
+        SerializablePairLongString serializablePair = (SerializablePairLongString) value;
+        time = serializablePair.lhs;
+        lastString = serializablePair.rhs;
+      } else if (value instanceof String) {
+        lastString = (String) value;
+      } else {
+        throw new ISE(
+            "Try to aggregate unsuported class type [%s].Supported class types: String or SerializablePairLongString",
+            value.getClass().getCanonicalName()
+        );
+      }
+    }
+
+    long lastTime = mutationBuffer.getLong(position);
+
+    if (time >= lastTime) {
+      if (lastString != null) {
+        if (lastString.length() > maxStringBytes) {
+          lastString = lastString.substring(0, maxStringBytes);
+        }
+
+        byte[] valueBytes = StringUtils.toUtf8(lastString);
+
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+
+        mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+        mutationBuffer.put(valueBytes);
+      } else {
+        mutationBuffer.putLong(position, time);
+        mutationBuffer.putInt(position + Long.BYTES, 0);
+      }
+    }
+  }
+
+  @Override
+  public Object get(ByteBuffer buf, int position)
+  {
+    ByteBuffer mutationBuffer = buf.duplicate();
+    mutationBuffer.position(position);
+
+    Long timeValue = mutationBuffer.getLong(position);
+    Integer stringSizeBytes = mutationBuffer.getInt(position + Long.BYTES);
+
+    SerializablePairLongString serializablePair;
+
+    if (stringSizeBytes > 0) {
+      byte[] valueBytes = new byte[stringSizeBytes];
+      mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+      mutationBuffer.get(valueBytes, 0, stringSizeBytes);
+      serializablePair = new SerializablePairLongString(timeValue, StringUtils.fromUtf8(valueBytes));
+    } else {
+      serializablePair = new SerializablePairLongString(timeValue, null);
+    }
+
+    return serializablePair;
+  }
+
+  @Override
+  public float getFloat(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getFloat()");
+  }
+
+  @Override
+  public long getLong(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getLong()");
+  }
+
+  @Override
+  public double getDouble(ByteBuffer buf, int position)
+  {
+    throw new UnsupportedOperationException("StringFirstAggregator does not support getDouble()");
+  }
+
+  @Override
+  public void close()
+  {
+    // no resources to cleanup
+  }
+
+  @Override
+  public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+  {
+    inspector.visit("timeSelector", timeSelector);
+    inspector.visit("valueSelector", valueSelector);
+  }
+}
diff --git a/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
new file mode 100644
index 00000000000..9bd6a64488e
--- /dev/null
+++ b/processing/src/main/java/io/druid/query/aggregation/last/StringLastFoldingAggregatorFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.druid.java.util.common.StringUtils;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+
+import java.nio.ByteBuffer;
+
+@JsonTypeName("stringLastFold")
+public class StringLastFoldingAggregatorFactory extends StringLastAggregatorFactory
+{
+  public StringLastFoldingAggregatorFactory(
+      @JsonProperty("name") String name,
+      @JsonProperty("fieldName") final String fieldName,
+      @JsonProperty("maxStringBytes") Integer maxStringBytes
+  )
+  {
+    super(name, fieldName, maxStringBytes);
+  }
+
+  @Override
+  public Aggregator factorize(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
+    return new StringLastAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate()
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
+        if (pair != null && pair.lhs >= lastTime) {
+          lastTime = pair.lhs;
+          lastValue = pair.rhs;
+        }
+      }
+    };
+  }
+
+  @Override
+  public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
+  {
+    final BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(getName());
+    return new StringLastBufferAggregator(null, null, maxStringBytes)
+    {
+      @Override
+      public void aggregate(ByteBuffer buf, int position)
+      {
+        SerializablePairLongString pair = (SerializablePairLongString) selector.getObject();
+        if (pair != null && pair.lhs != null) {
+          ByteBuffer mutationBuffer = buf.duplicate();
+          mutationBuffer.position(position);
+
+          long lastTime = mutationBuffer.getLong(position);
+
+          if (pair.lhs >= lastTime) {
+            mutationBuffer.putLong(position, pair.lhs);
+            if (pair.rhs != null) {
+              byte[] valueBytes = StringUtils.toUtf8(pair.rhs);
+
+              mutationBuffer.putInt(position + Long.BYTES, valueBytes.length);
+              mutationBuffer.position(position + Long.BYTES + Integer.BYTES);
+              mutationBuffer.put(valueBytes);
+            } else {
+              mutationBuffer.putInt(position + Long.BYTES, 0);
+            }
+          }
+        }
+      }
+
+      @Override
+      public void inspectRuntimeShape(RuntimeShapeInspector inspector)
+      {
+        inspector.visit("selector", selector);
+      }
+    };
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
new file mode 100644
index 00000000000..8f523c02ee5
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringFirstAggregationTest
+{
+  private final Integer MAX_STRING_SIZE = 1024;
+  private StringFirstAggregatorFactory stringLastAggFactory;
+  private StringFirstAggregatorFactory combiningAggFactory;
+  private ColumnSelectorFactory colSelectorFactory;
+  private TestLongColumnSelector timeSelector;
+  private TestObjectColumnSelector<String> valueSelector;
+  private TestObjectColumnSelector objectSelector;
+
+  private String[] strings = {"1111", "2222", "3333", null, "4444"};
+  private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private SerializablePairLongString[] pairs = {
+      new SerializablePairLongString(52782L, "AAAA"),
+      new SerializablePairLongString(65492L, "BBBB"),
+      new SerializablePairLongString(69134L, "CCCC"),
+      new SerializablePairLongString(11111L, "DDDD"),
+      new SerializablePairLongString(51223L, null)
+  };
+
+  @Before
+  public void setup()
+  {
+    stringLastAggFactory = new StringFirstAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
+    combiningAggFactory = (StringFirstAggregatorFactory) stringLastAggFactory.getCombiningFactory();
+    timeSelector = new TestLongColumnSelector(times);
+    valueSelector = new TestObjectColumnSelector<>(strings);
+    objectSelector = new TestObjectColumnSelector<>(pairs);
+    colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.replay(colSelectorFactory);
+  }
+
+  @Test
+  public void testStringLastAggregator()
+  {
+    StringFirstAggregator agg = (StringFirstAggregator) stringLastAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[2], result.rhs);
+  }
+
+  @Test
+  public void testStringLastBufferAggregator()
+  {
+    StringFirstBufferAggregator agg = (StringFirstBufferAggregator) stringLastAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+    Assert.assertEquals(strings[2], result.rhs);
+  }
+
+  @Test
+  public void testCombine()
+  {
+    SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
+    SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB");
+    Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+  }
+
+  @Test
+  public void testStringLastCombiningAggregator()
+  {
+    StringFirstAggregator agg = (StringFirstAggregator) combiningAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+    Pair<Long, String> expected = pairs[3];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringFirstCombiningBufferAggregator()
+  {
+    StringFirstBufferAggregator agg = (StringFirstBufferAggregator) combiningAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+    Pair<Long, String> expected = pairs[3];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringFirstAggregateCombiner()
+  {
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregateCombiner stringFirstAggregateCombiner =
+        (StringFirstAggregateCombiner) combiningAggFactory.makeAggregateCombiner();
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    columnSelector.increment();
+    stringFirstAggregateCombiner.fold(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+  }
+
+  private void aggregate(
+      StringFirstAggregator agg
+  )
+  {
+    agg.aggregate();
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+
+  private void aggregate(
+      StringFirstBufferAggregator agg,
+      ByteBuffer buff,
+      int position
+  )
+  {
+    agg.aggregate(buff, position);
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
new file mode 100644
index 00000000000..8a4a0de986e
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringFirstBufferAggregatorTest
+{
+  private void aggregateBuffer(
+      TestLongColumnSelector timeSelector,
+      TestObjectColumnSelector valueSelector,
+      BufferAggregator agg,
+      ByteBuffer buf,
+      int position
+  )
+  {
+    agg.aggregate(buf, position);
+    timeSelector.increment();
+    valueSelector.increment();
+  }
+
+  @Test
+  public void testBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
+
+
+    Assert.assertEquals("expectec last string value", strings[0], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[0]), new Long(sp.lhs));
+
+  }
+
+  @Test
+  public void testNullBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {2222L, 1111L, 3333L, 4444L, 5555L};
+    final String[] strings = {null, "AAAA", "BBBB", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
+
+
+    Assert.assertEquals("expectec last string value", strings[1], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[1]), new Long(sp.lhs));
+
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoStringValue()
+  {
+
+    final long[] timestamps = {1526724000L, 1526724600L};
+    final Double[] doubles = {null, 2.00};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
+
+    StringFirstAggregatorFactory factory = new StringFirstAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringFirstBufferAggregator agg = new StringFirstBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
new file mode 100644
index 00000000000..bac9a6dfcbc
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/first/StringFirstTimeseriesQueryTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.first;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringFirstTimeseriesQueryTest
+{
+
+  @Test
+  public void testTopNWithDistinctCountAgg() throws Exception
+  {
+    TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+    String visitor_id = "visitor_id";
+    String client_type = "client_type";
+
+    IncrementalIndex index = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.SECOND)
+                .withMetrics(new CountAggregatorFactory("cnt"))
+                .withMetrics(new StringFirstAggregatorFactory(
+                    "last_client_type", "client_type", 1024)
+                )
+                .build()
+        )
+        .setMaxRowCount(1000)
+        .buildOnheap();
+
+
+    DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+    long timestamp = time.getMillis();
+
+    DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+    long timestamp1 = time1.getMillis();
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp1,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "android")
+        )
+    );
+
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.dataSource)
+                                  .granularity(QueryRunnerTestHelper.allGran)
+                                  .intervals(QueryRunnerTestHelper.fullOnInterval)
+                                  .aggregators(
+                                      Lists.newArrayList(
+                                          new StringFirstAggregatorFactory(
+                                              "last_client_type", client_type, 1024
+                                          )
+                                      )
+                                  )
+                                  .build();
+
+    final Iterable<Result<TimeseriesResultValue>> results =
+        engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
+
+    List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            time,
+            new TimeseriesResultValue(
+                ImmutableMap.<String, Object>of("last_client_type", new SerializablePairLongString(timestamp, "iphone"))
+            )
+        )
+    );
+    TestHelper.assertExpectedResults(expectedResults, results);
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
new file mode 100644
index 00000000000..1f2ecc48152
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastAggregationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.java.util.common.Pair;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.column.Column;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+public class StringLastAggregationTest
+{
+  private final Integer MAX_STRING_SIZE = 1024;
+  private StringLastAggregatorFactory stringLastAggFactory;
+  private StringLastAggregatorFactory combiningAggFactory;
+  private ColumnSelectorFactory colSelectorFactory;
+  private TestLongColumnSelector timeSelector;
+  private TestObjectColumnSelector<String> valueSelector;
+  private TestObjectColumnSelector objectSelector;
+
+  private String[] strings = {"1111", "2222", "3333", null, "4444"};
+  private long[] times = {8224, 6879, 2436, 3546, 7888};
+  private SerializablePairLongString[] pairs = {
+      new SerializablePairLongString(52782L, "AAAA"),
+      new SerializablePairLongString(65492L, "BBBB"),
+      new SerializablePairLongString(69134L, "CCCC"),
+      new SerializablePairLongString(11111L, "DDDD"),
+      new SerializablePairLongString(51223L, null)
+  };
+
+  @Before
+  public void setup()
+  {
+    stringLastAggFactory = new StringLastAggregatorFactory("billy", "nilly", MAX_STRING_SIZE);
+    combiningAggFactory = (StringLastAggregatorFactory) stringLastAggFactory.getCombiningFactory();
+    timeSelector = new TestLongColumnSelector(times);
+    valueSelector = new TestObjectColumnSelector<>(strings);
+    objectSelector = new TestObjectColumnSelector<>(pairs);
+    colSelectorFactory = EasyMock.createMock(ColumnSelectorFactory.class);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector(Column.TIME_COLUMN_NAME)).andReturn(timeSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("nilly")).andReturn(valueSelector);
+    EasyMock.expect(colSelectorFactory.makeColumnValueSelector("billy")).andReturn(objectSelector);
+    EasyMock.replay(colSelectorFactory);
+  }
+
+  @Test
+  public void testStringLastAggregator()
+  {
+    StringLastAggregator agg = (StringLastAggregator) stringLastAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+
+    Assert.assertEquals(strings[0], result.rhs);
+  }
+
+  @Test
+  public void testStringLastBufferAggregator()
+  {
+    StringLastBufferAggregator agg = (StringLastBufferAggregator) stringLastAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+
+    Assert.assertEquals(strings[0], result.rhs);
+  }
+
+  @Test
+  public void testCombine()
+  {
+    SerializablePairLongString pair1 = new SerializablePairLongString(1467225000L, "AAAA");
+    SerializablePairLongString pair2 = new SerializablePairLongString(1467240000L, "BBBB");
+    Assert.assertEquals(pair2, stringLastAggFactory.combine(pair1, pair2));
+  }
+
+  @Test
+  public void testStringLastCombiningAggregator()
+  {
+    StringLastAggregator agg = (StringLastAggregator) combiningAggFactory.factorize(colSelectorFactory);
+
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+    aggregate(agg);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get();
+    Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringLastCombiningBufferAggregator()
+  {
+    StringLastBufferAggregator agg = (StringLastBufferAggregator) combiningAggFactory.factorizeBuffered(
+        colSelectorFactory);
+
+    ByteBuffer buffer = ByteBuffer.wrap(new byte[stringLastAggFactory.getMaxIntermediateSize()]);
+    agg.init(buffer, 0);
+
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+    aggregate(agg, buffer, 0);
+
+    Pair<Long, String> result = (Pair<Long, String>) agg.get(buffer, 0);
+    Pair<Long, String> expected = (Pair<Long, String>) pairs[2];
+
+    Assert.assertEquals(expected.lhs, result.lhs);
+    Assert.assertEquals(expected.rhs, result.rhs);
+  }
+
+  @Test
+  public void testStringLastAggregateCombiner()
+  {
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    TestObjectColumnSelector columnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringLastAggregateCombiner stringFirstAggregateCombiner =
+        (StringLastAggregateCombiner) combiningAggFactory.makeAggregateCombiner();
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[0], stringFirstAggregateCombiner.getObject());
+
+    columnSelector.increment();
+    stringFirstAggregateCombiner.fold(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+
+    stringFirstAggregateCombiner.reset(columnSelector);
+
+    Assert.assertEquals(strings[1], stringFirstAggregateCombiner.getObject());
+  }
+
+  private void aggregate(
+      StringLastAggregator agg
+  )
+  {
+    agg.aggregate();
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+
+  private void aggregate(
+      StringLastBufferAggregator agg,
+      ByteBuffer buff,
+      int position
+  )
+  {
+    agg.aggregate(buff, position);
+    timeSelector.increment();
+    valueSelector.increment();
+    objectSelector.increment();
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
new file mode 100644
index 00000000000..c7c125b67dd
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastBufferAggregatorTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.aggregation.TestLongColumnSelector;
+import io.druid.query.aggregation.TestObjectColumnSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class StringLastBufferAggregatorTest
+{
+  private void aggregateBuffer(
+      TestLongColumnSelector timeSelector,
+      TestObjectColumnSelector valueSelector,
+      BufferAggregator agg,
+      ByteBuffer buf,
+      int position
+  )
+  {
+    agg.aggregate(buf, position);
+    timeSelector.increment();
+    valueSelector.increment();
+  }
+
+  @Test
+  public void testBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1526724600L, 1526724700L, 1526724800L, 1526725900L, 1526725000L};
+    final String[] strings = {"AAAA", "BBBB", "CCCC", "DDDD", "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
+
+
+    Assert.assertEquals("expectec last string value", "DDDD", sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new Long(1526725900L), new Long(sp.lhs));
+
+  }
+
+  @Test
+  public void testNullBufferAggregate() throws Exception
+  {
+
+    final long[] timestamps = {1111L, 2222L, 6666L, 4444L, 5555L};
+    final String[] strings = {"CCCC", "AAAA", "BBBB", null, "EEEE"};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<String> objectColumnSelector = new TestObjectColumnSelector<>(strings);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+
+    SerializablePairLongString sp = ((SerializablePairLongString) agg.get(buf, position));
+
+
+    Assert.assertEquals("expectec last string value", strings[2], sp.rhs);
+    Assert.assertEquals("last string timestamp is the biggest", new Long(timestamps[2]), new Long(sp.lhs));
+
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoStringValue()
+  {
+
+    final long[] timestamps = {1526724000L, 1526724600L};
+    final Double[] doubles = {null, 2.00};
+    Integer maxStringBytes = 1024;
+
+    TestLongColumnSelector longColumnSelector = new TestLongColumnSelector(timestamps);
+    TestObjectColumnSelector<Double> objectColumnSelector = new TestObjectColumnSelector<>(doubles);
+
+    StringLastAggregatorFactory factory = new StringLastAggregatorFactory(
+        "billy", "billy", maxStringBytes
+    );
+
+    StringLastBufferAggregator agg = new StringLastBufferAggregator(
+        longColumnSelector,
+        objectColumnSelector,
+        maxStringBytes
+    );
+
+    String testString = "ZZZZ";
+
+    ByteBuffer buf = ByteBuffer.allocate(factory.getMaxIntermediateSize());
+    buf.putLong(1526728500L);
+    buf.putInt(testString.length());
+    buf.put(testString.getBytes(StandardCharsets.UTF_8));
+
+    int position = 0;
+
+    agg.init(buf, position);
+    //noinspection ForLoopReplaceableByForEach
+    for (int i = 0; i < timestamps.length; i++) {
+      aggregateBuffer(longColumnSelector, objectColumnSelector, agg, buf, position);
+    }
+  }
+}
diff --git a/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
new file mode 100644
index 00000000000..a68798e64be
--- /dev/null
+++ b/processing/src/test/java/io/druid/query/aggregation/last/StringLastTimeseriesQueryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.last;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.DateTimes;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.SerializablePairLongString;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexSchema;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+public class StringLastTimeseriesQueryTest
+{
+
+  @Test
+  public void testTopNWithDistinctCountAgg() throws Exception
+  {
+    TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+    String visitor_id = "visitor_id";
+    String client_type = "client_type";
+
+    IncrementalIndex index = new IncrementalIndex.Builder()
+        .setIndexSchema(
+            new IncrementalIndexSchema.Builder()
+                .withQueryGranularity(Granularities.SECOND)
+                .withMetrics(new CountAggregatorFactory("cnt"))
+                .withMetrics(new StringLastAggregatorFactory(
+                    "last_client_type", "client_type", 1024)
+                )
+                .build()
+        )
+        .setMaxRowCount(1000)
+        .buildOnheap();
+
+
+    DateTime time = DateTimes.of("2016-03-04T00:00:00.000Z");
+    long timestamp = time.getMillis();
+
+    DateTime time1 = DateTimes.of("2016-03-04T01:00:00.000Z");
+    long timestamp1 = time1.getMillis();
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
+        )
+    );
+    index.add(
+        new MapBasedInputRow(
+            timestamp1,
+            Lists.newArrayList(visitor_id, client_type),
+            ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "android")
+        )
+    );
+
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource(QueryRunnerTestHelper.dataSource)
+                                  .granularity(QueryRunnerTestHelper.allGran)
+                                  .intervals(QueryRunnerTestHelper.fullOnInterval)
+                                  .aggregators(
+                                      Lists.newArrayList(
+                                          new StringLastAggregatorFactory(
+                                              "last_client_type", client_type, 1024
+                                          )
+                                      )
+                                  )
+                                  .build();
+
+    final Iterable<Result<TimeseriesResultValue>> results =
+        engine.process(query, new IncrementalIndexStorageAdapter(index)).toList();
+
+    List<Result<TimeseriesResultValue>> expectedResults = Collections.singletonList(
+        new Result<>(
+            time,
+            new TimeseriesResultValue(
+                ImmutableMap.<String, Object>of(
+                    "last_client_type",
+                    new SerializablePairLongString(timestamp1, "android")
+                )
+            )
+        )
+    );
+    TestHelper.assertExpectedResults(expectedResults, results);
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org