You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/31 21:41:06 UTC

[incubator-pinot] branch master updated: Refactor DistinctQueriesTest to cover more scenarios (#5168)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 189f3b8  Refactor DistinctQueriesTest to cover more scenarios (#5168)
189f3b8 is described below

commit 189f3b821f6cc5894331baff779ddeec090a1368
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Mar 31 14:40:56 2020 -0700

    Refactor DistinctQueriesTest to cover more scenarios (#5168)
    
    - Test all data types (including BYTES)
    - Test empty responses
    - Test merge of non-empty response and empty response for both server side and broker side
    
    TODO:
    - ORDER BY bytesColumn is not properly supported right now
    - DistinceTable limit is not properly set
---
 .../core/query/aggregation/DistinctTable.java      |    3 +-
 .../query/reduce/DistinctDataTableReducer.java     |    2 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  |    8 +-
 .../apache/pinot/queries/DistinctQueriesTest.java  | 1281 ++++++++------------
 4 files changed, 497 insertions(+), 797 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
index 4bf4628..6accdde 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Set;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
@@ -36,6 +35,7 @@ import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.data.table.BaseTable;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.spi.utils.BytesUtils;
 
 /**
  * This serves the following purposes:
@@ -188,6 +188,7 @@ public class DistinctTable extends BaseTable {
             break;
           case BYTES:
             columnValues[colIndex] = dataTable.getString(rowIndex, colIndex);
+            break;
           default:
             throw new IllegalStateException(
                 "Unexpected column data type " + columnDataType + " while deserializing data table for DISTINCT query");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 85a1450..e45366a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -108,7 +108,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
       if (mergedIntermediateResult == null) {
         mergedIntermediateResult = intermediateResultToMerge;
       } else {
-        _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
+        mergedIntermediateResult = _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
       }
     }
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index a325a97..5d1dc8f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -44,10 +44,10 @@ import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
  * Base class for queries tests.
  */
 public abstract class BaseQueriesTest {
-  private static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
-  private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
-  private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
-  private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
+  protected static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
+  protected static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
+  protected static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+  protected static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
 
   protected abstract String getFilter();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 2196b4a..647b68b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -23,16 +23,23 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -41,70 +48,69 @@ import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.query.aggregation.DistinctTable;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.pql.parsers.Pql2Compiler;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import org.testng.collections.Lists;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
- * Class to test DISTINCT queries.
- * Generates custom data set with explicitly generating
- * duplicate rows/keys
+ * Queries test for DISTINCT queries.
+ * TODO: Add a test for 'ORDER BY bytesColumn' when it is supported
  */
 public class DistinctQueriesTest extends BaseQueriesTest {
-  private static final int NUM_ROWS = 1_000_000;
-
-  private List<GenericRow> _rows = new ArrayList<>();
-
-  private static String D1 = "STRING_COL1";
-  private static String D2 = "STRING_COL2";
-  private static String M1 = "INT_COL";
-  private static String M2 = "LONG_COL";
-
-  // in the custom data set, each row is repeated after 20 rows
-  private static final int TUPLE_REPEAT_INTERVAL = 20;
-  // in the custom data set, each row is repeated 5 times, total 200k unique rows in dataset
-  private static final int PER_TUPLE_REPEAT_FREQUENCY = 5;
-  private static final int NUM_UNIQUE_TUPLES = NUM_ROWS / PER_TUPLE_REPEAT_FREQUENCY;
-
-  private static final int INT_BASE_VALUE = 10000;
-  private static final int INT_INCREMENT = 500;
-  private static final long LONG_BASE_VALUE = 100000000;
-  private static final long LONG_INCREMENT = 5500;
-
-  private static final String TABLE_NAME = "DistinctTestTable";
-  private static final int NUM_SEGMENTS = 2;
-  private static final String SEGMENT_NAME_1 = TABLE_NAME + "_100000000_200000000";
-  private static final String SEGMENT_NAME_2 = TABLE_NAME + "_300000000_400000000";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DistinctQueryTest");
-
-  private List<IndexSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS);
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME_PREFIX = "testSegment_";
+
+  private static final int NUM_RECORDS_PER_SEGMENT = 10000;
+  private static final int NUM_UNIQUE_RECORDS_PER_SEGMENT = 100;
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String BYTES_COLUMN = "bytesColumn";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
+
+  private IndexSegment _indexSegment;
   private List<SegmentDataManager> _segmentDataManagers;
-  private final Set<Record> _expectedAddTransformResults = new HashSet<>();
-  private final Set<Record> _expectedSubTransformResults = new HashSet<>();
-  private final Set<Record> _expectedAddSubTransformResults = new HashSet<>();
-  private final Set<Record> _expectedResults = new HashSet<>();
-  private final FieldSpec.DataType[] _dataTypes =
-      new FieldSpec.DataType[]{FieldSpec.DataType.STRING, FieldSpec.DataType.STRING, FieldSpec.DataType.INT, FieldSpec.DataType.LONG};
 
-  private Schema _schema;
+  @Override
+  protected String getFilter() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<SegmentDataManager> getSegmentDataManagers() {
+    return _segmentDataManagers;
+  }
 
   @BeforeClass
   public void setUp() {
-    Pql2Compiler.ENABLE_DISTINCT = true;
-    createPinotTableSchema();
-    createTestData();
+    FileUtils.deleteQuietly(INDEX_DIR);
   }
 
   @AfterClass
@@ -112,797 +118,490 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     FileUtils.deleteQuietly(INDEX_DIR);
   }
 
-  private void createPinotTableSchema() {
-    _schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
-            .addSingleValueDimension(D2, FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT)
-            .addMetric(M2, FieldSpec.DataType.LONG).build();
-  }
-
   /**
-   * Custom data generator that explicitly generates duplicate
-   * rows in dataset for purpose of testing DISTINCT functionality
+   * Helper method to generate records based on the given base value.
+   *
+   * All columns will have the same value but different data types (BYTES values are encoded STRING values).
+   * For the {i}th unique record, the value will be {baseValue + i}.
    */
-  private void createTestData() {
-    int pos = 0;
-    Object[] columnValues = new Object[_schema.size()];
-    for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
-      GenericRow row = new GenericRow();
-      double addition;
-      double subtraction;
-      int col = 0;
-      boolean duplicate = false;
-      for (FieldSpec.DataType dataType : _dataTypes) {
-        // generate each column for the row
-        Object value = null;
-        if (rowIndex == 0) {
-          switch (dataType) {
-            case INT:
-              value = INT_BASE_VALUE;
-              row.putField(M1, value);
-              break;
-            case LONG:
-              value = LONG_BASE_VALUE;
-              row.putField(M2, value);
-              break;
-            case STRING:
-              value = RandomStringUtils.randomAlphabetic(10);
-              if (col == 0) {
-                row.putField(D1, value);
-              } else {
-                row.putField(D2, value);
-              }
-              break;
-          }
-        } else {
-          if (rowIndex == pos + (TUPLE_REPEAT_INTERVAL * PER_TUPLE_REPEAT_FREQUENCY)) {
-            pos = rowIndex;
-          }
-          if (rowIndex < pos + TUPLE_REPEAT_INTERVAL) {
-            // generate unique row
-            switch (dataType) {
-              case INT:
-                value = (Integer) _rows.get(rowIndex - 1).getValue(M1) + INT_INCREMENT;
-                row.putField(M1, value);
-                break;
-              case LONG:
-                value = (Long) _rows.get(rowIndex - 1).getValue(M2) + LONG_INCREMENT;
-                row.putField(M2, value);
-                break;
-              case STRING:
-                value = RandomStringUtils.randomAlphabetic(10);
-                if (col == 0) {
-                  row.putField(D1, value);
-                } else {
-                  row.putField(D2, value);
-                }
-                break;
-            }
-          } else {
-            // generate duplicate row
-            duplicate = true;
-            switch (dataType) {
-              case INT:
-                value = _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(M1);
-                row.putField(M1, value);
-                break;
-              case LONG:
-                value = _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(M2);
-                row.putField(M2, value);
-                break;
-              case STRING:
-                if (col == 0) {
-                  row.putField(D1, _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(D1));
-                } else {
-                  row.putField(D2, _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(D2));
-                }
-                break;
-            }
-          }
-        }
-
-        columnValues[col++] = value;
-      }
-
-      // add the generated row
-      _rows.add(row);
-
-      // compute expected result for add and sub transform function
-      addition = ((Integer) columnValues[2]) + ((Long) columnValues[3]);
-      subtraction = ((Long) columnValues[3]) - ((Integer) columnValues[2]);
-
-      // compute expected result for multi column distinct
-      if (!duplicate) {
-        Record record = new Record(new Object[]{columnValues[0], columnValues[1], columnValues[2], columnValues[3]});
-        _expectedResults.add(record);
-      }
-
-      _expectedAddTransformResults.add(new Record(new Object[]{addition}));
-      _expectedSubTransformResults.add(new Record(new Object[]{subtraction}));
-      _expectedAddSubTransformResults.add(new Record(new Object[]{addition, subtraction}));
+  private List<GenericRow> generateRecords(int baseValue) {
+    List<GenericRow> uniqueRecords = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT);
+    for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+      int value = baseValue + i;
+      GenericRow record = new GenericRow();
+      record.putValue(INT_COLUMN, value);
+      record.putValue(LONG_COLUMN, (long) value);
+      record.putValue(FLOAT_COLUMN, (float) value);
+      record.putValue(DOUBLE_COLUMN, (double) value);
+      String stringValue = Integer.toString(value);
+      record.putValue(STRING_COLUMN, stringValue);
+      byte[] bytesValue = StringUtil.encodeUtf8(stringValue);
+      record.putValue(BYTES_COLUMN, bytesValue);
+      uniqueRecords.add(record);
     }
-  }
-
-  @Override
-  protected String getFilter() {
-    return "";
-  }
-
-  @Override
-  protected IndexSegment getIndexSegment() {
-    return _indexSegments.get(0);
-  }
 
-  @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS_PER_SEGMENT);
+    for (int i = 0; i < NUM_RECORDS_PER_SEGMENT; i += NUM_UNIQUE_RECORDS_PER_SEGMENT) {
+      records.addAll(uniqueRecords);
+    }
+    return records;
   }
 
-  private void createSegment(Schema schema, RecordReader recordReader, String segmentName, String tableName)
+  private ImmutableSegment createSegment(int index, List<GenericRow> records)
       throws Exception {
-    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(schema);
-    segmentGeneratorConfig.setTableName(tableName);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    String segmentName = SEGMENT_NAME_PREFIX + index;
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
     segmentGeneratorConfig.setSegmentName(segmentName);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
 
     SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, recordReader);
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records, SCHEMA));
     driver.build();
 
-    File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
-    if (!segmentIndexDir.exists()) {
-      throw new IllegalStateException("Segment generation failed");
-    }
-  }
-
-  private ImmutableSegment loadSegment(String segmentName)
-      throws Exception {
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap);
+    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
   }
 
   /**
-   * Test DISTINCT query with multiple columns on generated data set.
-   * All the generated dataset is put into a single segment
-   * and we directly run the {@link AggregationOperator} to
-   * get segment level execution results.
-   * The results are then compared to the expected result table
-   * that was build during data generation
-   * @throws Exception
+   * Test DISTINCT query within a single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   * </ul>
    */
   @Test
   public void testDistinctInnerSegment()
       throws Exception {
+    _indexSegment = createSegment(0, generateRecords(0));
     try {
-      // put all the generated dataset in a single segment
-      try (RecordReader recordReader = new GenericRowRecordReader(_rows, _schema)) {
-        createSegment(_schema, recordReader, SEGMENT_NAME_1, TABLE_NAME);
-        final ImmutableSegment immutableSegment = loadSegment(SEGMENT_NAME_1);
-        _indexSegments.add(immutableSegment);
-
-        // All 200k unique rows should be returned
+      {
+        // Test selecting all columns
         String query =
-            "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 1000000";
-        innerSegmentTestHelper(query, NUM_UNIQUE_TUPLES);
-
-        // All 200k unique rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 200000";
-        innerSegmentTestHelper(query, NUM_UNIQUE_TUPLES);
-
-        // 100k rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 100000";
-        innerSegmentTestHelper(query, 100000);
-
-        // default: 10 unique rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable";
-        innerSegmentTestHelper(query, 10);
-
-        // default: 10 unique rows should be returned
-        query = "SELECT DISTINCT(add(INT_COL,LONG_COL)) FROM DistinctTestTable";
-        innerSegmentTransformQueryTestHelper(query, 10, 1, new String[]{"add(INT_COL,LONG_COL)"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
-
-        // default: 10 unique rows should be returned
-        query = "SELECT DISTINCT(sub(LONG_COL,INT_COL)) FROM DistinctTestTable";
-        innerSegmentTransformQueryTestHelper(query, 10, 2, new String[]{"sub(LONG_COL,INT_COL)"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
-
-        // 100k unique rows should be returned
-        query = "SELECT DISTINCT(add(INT_COL,LONG_COL),sub(LONG_COL,INT_COL)) FROM DistinctTestTable LIMIT 100000 ";
-        innerSegmentTransformQueryTestHelper(query, 100000, 3,
-            new String[]{"add(INT_COL,LONG_COL)", "sub(LONG_COL,INT_COL)"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
+            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
+
+        // Check data schema
+        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DataSchema dataSchema = distinctTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(),
+            new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+        // Check values, where all 100 unique values should be returned
+        assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        Set<Integer> expectedValues = new HashSet<>();
+        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+          expectedValues.add(i);
+        }
+        Set<Integer> actualValues = new HashSet<>();
+        Iterator<Record> iterator = distinctTable.iterator();
+        while (iterator.hasNext()) {
+          Record record = iterator.next();
+          Object[] values = record.getValues();
+          int intValue = (int) values[0];
+          assertEquals(((Long) values[1]).intValue(), intValue);
+          assertEquals(((Float) values[2]).intValue(), intValue);
+          assertEquals(((Double) values[3]).intValue(), intValue);
+          assertEquals(Integer.parseInt((String) values[4]), intValue);
+          assertEquals(StringUtil.decodeUtf8((byte[]) values[5]), values[4]);
+          actualValues.add(intValue);
+        }
+        assertEquals(actualValues, expectedValues);
       }
-    } finally {
-      destroySegments();
-    }
-  }
-
-  /**
-   * Helper for inner segment query tests
-   * @param query query to run
-   * @param expectedSize expected result size
-   */
-  private void innerSegmentTestHelper(final String query, final int expectedSize) {
-    // compile to broker request and directly run the operator
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
-    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    List<Object> operatorResult = resultsBlock.getAggregationResult();
-
-    // verify resultset
-    Assert.assertNotNull(operatorResult);
-    Assert.assertEquals(operatorResult.size(), 1);
-    Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
-    DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
-    Assert.assertEquals(_expectedResults.size(), NUM_UNIQUE_TUPLES);
-    Assert.assertEquals(distinctTable.size(), expectedSize);
-
-    DataSchema dataSchema = distinctTable.getDataSchema();
-    Assert.assertEquals(dataSchema.getColumnNames(), new String[]{D1, D2, M1, M2});
-    Assert.assertEquals(dataSchema.getColumnDataTypes(),
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG});
-
-    Iterator<Record> iterator = distinctTable.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      Assert.assertEquals(record.getValues().length, 4);
-      Assert.assertTrue(_expectedResults.contains(record));
-    }
-  }
-
-  /**
-   * Helper for inner segment transform query tests
-   * @param query query to run
-   * @param expectedSize expected result size
-   */
-  private void innerSegmentTransformQueryTestHelper(final String query, final int expectedSize, final int op,
-      final String[] columnNames, final DataSchema.ColumnDataType[] columnTypes) {
-    // compile to broker request and directly run the operator
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
-    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    final List<Object> operatorResult = resultsBlock.getAggregationResult();
-
-    // verify resultset
-    Assert.assertNotNull(operatorResult);
-    Assert.assertEquals(operatorResult.size(), 1);
-    Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
-    final DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
-    Assert.assertEquals(distinctTable.size(), expectedSize);
-
-    DataSchema dataSchema = distinctTable.getDataSchema();
-    Assert.assertEquals(dataSchema.getColumnNames(), columnNames);
-    Assert.assertEquals(dataSchema.getColumnDataTypes(), columnTypes);
-
-    Iterator<Record> iterator = distinctTable.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      Assert.assertEquals(record.getValues().length, columnNames.length);
-      if (op == 1) {
-        Assert.assertTrue(_expectedAddTransformResults.contains(record));
-      } else if (op == 2) {
-        Assert.assertTrue(_expectedSubTransformResults.contains(record));
-      } else {
-        Assert.assertTrue(_expectedAddSubTransformResults.contains(record));
+      {
+        // Test selecting some columns with filter
+        String query =
+            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+
+        // Check data schema
+        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DataSchema dataSchema = distinctTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+        // Check values, where 40 matched values should be returned
+        assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+        Set<Integer> expectedValues = new HashSet<>();
+        for (int i = 60; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+          expectedValues.add(i);
+        }
+        Set<Integer> actualValues = new HashSet<>();
+        Iterator<Record> iterator = distinctTable.iterator();
+        while (iterator.hasNext()) {
+          Record record = iterator.next();
+          Object[] values = record.getValues();
+          int intValue = Integer.parseInt((String) values[0]);
+          assertEquals(StringUtil.decodeUtf8((byte[]) values[1]), values[0]);
+          assertEquals(((Float) values[2]).intValue(), intValue);
+          actualValues.add(intValue);
+        }
+        assertEquals(actualValues, expectedValues);
       }
-    }
-  }
-
-  /**
-   * Test DISTINCT query with multiple columns on generated data set.
-   * The generated dataset is divided into two segments.
-   * We exercise the entire execution from broker ->
-   * server -> segment. The server combines the results
-   * from segments and sends the data table to broker.
-   *
-   * Currently the base class mimics the broker level
-   * execution by duplicating the data table to mimic
-   * two servers and then doing the merge
-   *
-   * The results are then compared to the expected result table
-   * that was build during data generation
-   * @throws Exception
-   */
-  @Test(dependsOnMethods = {"testDistinctInnerSegment"})
-  public void testDistinctInterSegmentInterServer()
-      throws Exception {
-    try {
-      // divide the generated dataset into 2 parts and create 2 segments
-      final List<GenericRow> randomRows = new ArrayList<>();
-      final List<GenericRow> copiedRows = new ArrayList<>(_rows);
-      final int size = copiedRows.size();
-      for (int row = size - 1; row >= size / 2; row--) {
-        randomRows.add(copiedRows.remove(row));
+      {
+        // Test selecting some columns with transform, filter, order-by and limit
+        String query =
+            "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+
+        // Check data schema
+        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DataSchema dataSchema = distinctTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+        // Check values, where 60 matched values should be returned (limit won't take effect on server side)
+        assertEquals(distinctTable.size(), 60);
+        Set<Integer> expectedValues = new HashSet<>();
+        for (int i = 0; i < 60; i++) {
+          expectedValues.add(i);
+        }
+        Set<Integer> actualValues = new HashSet<>();
+        Iterator<Record> iterator = distinctTable.iterator();
+        while (iterator.hasNext()) {
+          Record record = iterator.next();
+          Object[] values = record.getValues();
+          int intValue = ((Double) values[0]).intValue() / 2;
+          assertEquals(Integer.parseInt((String) values[1]), intValue);
+          actualValues.add(intValue);
+        }
+        assertEquals(actualValues, expectedValues);
       }
-
-      try (RecordReader recordReader1 = new GenericRowRecordReader(copiedRows, _schema);
-          RecordReader recordReader2 = new GenericRowRecordReader(randomRows, _schema)) {
-        createSegment(_schema, recordReader1, SEGMENT_NAME_1, TABLE_NAME);
-        createSegment(_schema, recordReader2, SEGMENT_NAME_2, TABLE_NAME);
-        final ImmutableSegment segment1 = loadSegment(SEGMENT_NAME_1);
-        final ImmutableSegment segment2 = loadSegment(SEGMENT_NAME_2);
-
-        _indexSegments.add(segment1);
-        _indexSegments.add(segment2);
-        _segmentDataManagers =
-            Arrays.asList(new ImmutableSegmentDataManager(segment1), new ImmutableSegmentDataManager(segment2));
-
-        // All 200k unique rows should be returned
+      {
+        // Test selecting some columns with filter that does not match any record
         String query =
-            "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 1000000";
-        interSegmentInterServerTestHelper(query, NUM_UNIQUE_TUPLES);
-
-        // All 200k unique unique 1 million rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 200000";
-        interSegmentInterServerTestHelper(query, NUM_UNIQUE_TUPLES);
+            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
 
-        // 100k unique rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 100000";
-        interSegmentInterServerTestHelper(query, 100000);
+        // Check data schema, where data type should be STRING for all columns
+        DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+        DataSchema dataSchema = distinctTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
 
-        // Default: 10 unique rows should be returned
-        query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable";
-        interSegmentInterServerTestHelper(query, 10);
+        // Check values, where no record should be returned
+        assertEquals(distinctTable.size(), 0);
       }
     } finally {
-      destroySegments();
+      _indexSegment.destroy();
     }
   }
 
   /**
-   * Helper for inter segment, inter server query tests
-   * @param query query to run
-   * @param expectedSize expected result size
+   * Helper method to get the DistinctTable result for one single segment for the given query.
    */
-  private void interSegmentInterServerTestHelper(String query, int expectedSize) {
-    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-    final SelectionResults selectionResults = brokerResponse.getSelectionResults();
-
-    Assert.assertEquals(selectionResults.getColumns().size(), 4);
-
-    Assert.assertEquals(selectionResults.getColumns().get(0), D1);
-    Assert.assertEquals(selectionResults.getColumns().get(1), D2);
-    Assert.assertEquals(selectionResults.getColumns().get(2), M1);
-    Assert.assertEquals(selectionResults.getColumns().get(3), M2);
-
-    Assert.assertEquals(_expectedResults.size(), NUM_UNIQUE_TUPLES);
-    Assert.assertEquals(selectionResults.getRows().size(), expectedSize);
-
-    for (Serializable[] row : selectionResults.getRows()) {
-      Assert.assertEquals(row.length, 4);
-      Record record = new Record(row);
-      Assert.assertTrue(_expectedResults.contains(record));
-    }
+  private DistinctTable getDistinctTableInnerSegment(String query) {
+    AggregationOperator aggregationOperator = getOperatorForQuery(query);
+    List<Object> aggregationResult = aggregationOperator.nextBlock().getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
+    assertTrue(aggregationResult.get(0) instanceof DistinctTable);
+    return (DistinctTable) aggregationResult.get(0);
   }
 
   /**
-   * Test DISTINCT queries on multiple columns with FILTER.
-   * A simple hand-written data set of 10 rows in a single segment
-   * is used for FILTER based queries as opposed to generated data set.
-   * The results are compared to expected table.
-   *
-   * Runs 4 different queries with predicates.
-   * @throws Exception
+   * Test DISTINCT query across multiple segments and servers (2 servers, each with 2 segments).
+   * <p>Both PQL and SQL format are tested.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one segment but matches some records in
+   *     the other segment
+   *   </li>
+   *   <li>
+   *     Selecting some columns with filter that does not match any record in one server but matches some records in the
+   *     other server
+   *   </li>
+   * </ul>
    */
-  @Test(dependsOnMethods = {"testDistinctInterSegmentInterServer"})
-  public void testDistinctWithFilter()
+  @Test
+  public void testDistinctInterSegment()
       throws Exception {
+    ImmutableSegment segment0 = createSegment(0, generateRecords(0));
+    ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
+    _segmentDataManagers =
+        Arrays.asList(new ImmutableSegmentDataManager(segment0), new ImmutableSegmentDataManager(segment1));
     try {
-      String tableName = TABLE_NAME + "WithFilter";
-
-      Schema schema = new Schema.SchemaBuilder().setSchemaName(tableName)
-          .addSingleValueDimension("State", FieldSpec.DataType.STRING)
-          .addSingleValueDimension("City", FieldSpec.DataType.STRING).addMetric("SaleAmount", FieldSpec.DataType.INT)
-          .build();
-
-      String query1 = "SELECT DISTINCT(State, City) FROM " + tableName + " WHERE SaleAmount >= 200000";
-      String query2 = "SELECT DISTINCT(State, City) FROM " + tableName + " WHERE SaleAmount >= 400000";
-      String query3 = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount >= 200000";
-      String query4 = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount >= 400000";
-
-      Set<Record> q1ExpectedResults = new HashSet<>();
-      Set<Record> q2ExpectedResults = new HashSet<>();
-      Set<Record> q3ExpectedResults = new HashSet<>();
-      Set<Record> q4ExpectedResults = new HashSet<>();
-
-      List<GenericRow> rows =
-          createSimpleTable(q1ExpectedResults, q2ExpectedResults, q3ExpectedResults, q4ExpectedResults);
-
-      try (RecordReader recordReader = new GenericRowRecordReader(rows, schema)) {
-        createSegment(schema, recordReader, SEGMENT_NAME_1, tableName);
-        final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
-        _indexSegments.add(segment);
-        _segmentDataManagers =
-            Arrays.asList(new ImmutableSegmentDataManager(segment), new ImmutableSegmentDataManager(segment));
-
-        // without ORDER BY
-        runFilterQueryInnerSegment(q1ExpectedResults, query1, new String[]{"State", "City"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
-        runFilterQueryInnerSegment(q2ExpectedResults, query2, new String[]{"State", "City"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
-        runFilterQueryInnerSegment(q3ExpectedResults, query3, new String[]{"State", "City", "SaleAmount"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
-        runFilterQueryInnerSegment(q4ExpectedResults, query4, new String[]{"State", "City", "SaleAmount"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
-
-        // with ORDER BY ASC/DESC
-        String orderByQuery = query1 + " ORDER BY State, City LIMIT 100";
-        List<Record> sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query1 + " ORDER BY State, City LIMIT 2";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query1 + " ORDER BY State, City DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query1 + " ORDER BY State, City DESC LIMIT 2";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query2 + " ORDER BY State, City LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query2 + " ORDER BY State, City LIMIT 1";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query2 + " ORDER BY State, City DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query2 + " ORDER BY State, City DESC LIMIT 1";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
-        orderByQuery = query3 + " ORDER BY State, City, SaleAmount LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query3 + " ORDER BY State, City, SaleAmount LIMIT 3";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query3 + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query3 + " ORDER BY State, City, SaleAmount DESC LIMIT 3";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query4 + " ORDER BY State, City, SaleAmount LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query4 + " ORDER BY State, City, SaleAmount LIMIT 2";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query4 + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = query4 + " ORDER BY State, City, SaleAmount DESC LIMIT 2";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
-            + " ORDER BY State, City, SaleAmount LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery =
-            "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " ORDER BY State, City, SaleAmount LIMIT 10";
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery =
-            "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " ORDER BY State, City, SaleAmount LIMIT 5";
-        sortedResults = sortedResults.subList(0, 5);
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
-            + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
-            + " ORDER BY State, City, SaleAmount DESC LIMIT 10";
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
-            + " ORDER BY State, City, SaleAmount DESC LIMIT 5";
-        sortedResults = sortedResults.subList(0, 5);
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
-            + " ORDER BY State DESC, City, SaleAmount DESC LIMIT 100";
-        sortedResults = new ArrayList<>();
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
-        sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
-        sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
-        sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
-        sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
-        sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
-        runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
-        String emptyResultQuery =
-            "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount = 0";
-        // All column data types should be STRING for empty result
-        runFilterQueryInnerSegment(Collections.emptySet(), emptyResultQuery,
-            new String[]{"State", "City", "SaleAmount"},
-            new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
-        runQueryInterSegmentWithOrderBy(emptyResultQuery, Collections.emptyList(),
-            new String[]{"State", "City", "SaleAmount"});
-        emptyResultQuery += " ORDER BY State, City LIMIT 100";
-        runQueryInterSegmentWithOrderBy(emptyResultQuery, Collections.emptyList(),
-            new String[]{"State", "City", "SaleAmount"});
+      {
+        // Test selecting all columns
+        String pqlQuery =
+            "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
+        String sqlQuery =
+            "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable LIMIT 10000";
+
+        // Check data schema
+        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(),
+            Arrays.asList("intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"));
+        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(),
+            new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+        // Check values, where all 200 unique values should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+        Set<Integer> expectedValues = new HashSet<>();
+        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+          expectedValues.add(i);
+          expectedValues.add(1000 + i);
+        }
+        Set<Integer> pqlValues = new HashSet<>();
+        for (Serializable[] row : pqlRows) {
+          int intValue = (int) row[0];
+          assertEquals(((Long) row[1]).intValue(), intValue);
+          assertEquals(((Float) row[2]).intValue(), intValue);
+          assertEquals(((Double) row[3]).intValue(), intValue);
+          assertEquals(Integer.parseInt((String) row[4]), intValue);
+          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
+          pqlValues.add(intValue);
+        }
+        assertEquals(pqlValues, expectedValues);
+        Set<Integer> sqlValues = new HashSet<>();
+        for (Object[] row : sqlRows) {
+          int intValue = (int) row[0];
+          assertEquals(((Long) row[1]).intValue(), intValue);
+          assertEquals(((Float) row[2]).intValue(), intValue);
+          assertEquals(((Double) row[3]).intValue(), intValue);
+          assertEquals(Integer.parseInt((String) row[4]), intValue);
+          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
+          sqlValues.add(intValue);
+        }
+        assertEquals(sqlValues, expectedValues);
+      }
+      {
+        // Test selecting some columns with filter
+        String pqlQuery =
+            "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+        String sqlQuery =
+            "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+
+        // Check data schema
+        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(), Arrays.asList("stringColumn", "bytesColumn", "floatColumn"));
+        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+        // Check values, where 140 matched values should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+        Set<Integer> expectedValues = new HashSet<>();
+        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+          if (i >= 60) {
+            expectedValues.add(i);
+          }
+          expectedValues.add(1000 + i);
+        }
+        Set<Integer> pqlValues = new HashSet<>();
+        for (Serializable[] row : pqlRows) {
+          int intValue = Integer.parseInt((String) row[0]);
+          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
+          assertEquals(((Float) row[2]).intValue(), intValue);
+          pqlValues.add(intValue);
+        }
+        assertEquals(pqlValues, expectedValues);
+        Set<Integer> sqlValues = new HashSet<>();
+        for (Object[] row : sqlRows) {
+          int intValue = Integer.parseInt((String) row[0]);
+          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
+          assertEquals(((Float) row[2]).intValue(), intValue);
+          sqlValues.add(intValue);
+        }
+        assertEquals(sqlValues, expectedValues);
+      }
+      {
+        // Test selecting some columns with transform, filter, order-by and limit
+        String pqlQuery =
+            "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+        String sqlQuery =
+            "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+
+        // Check data schema
+        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(), Arrays.asList("add(intColumn,floatColumn)", "stringColumn"));
+        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+        // Check values, where only 10 top values sorted in string format descending order should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertEquals(pqlRows.size(), 10);
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertEquals(sqlRows.size(), 10);
+        int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
+        for (int i = 0; i < 10; i++) {
+          Serializable[] row = pqlRows.get(i);
+          int intValue = ((Double) row[0]).intValue() / 2;
+          assertEquals(intValue, expectedValues[i]);
+          assertEquals(Integer.parseInt((String) row[1]), intValue);
+        }
+        for (int i = 0; i < 10; i++) {
+          Object[] row = sqlRows.get(i);
+          int intValue = ((Double) row[0]).intValue() / 2;
+          assertEquals(intValue, expectedValues[i]);
+          assertEquals(Integer.parseInt((String) row[1]), intValue);
+        }
+      }
+      {
+        // Test selecting some columns with filter that does not match any record
+        String pqlQuery =
+            "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+        String sqlQuery =
+            "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+
+        // Check data schema, where data type should be STRING for all columns
+        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(), Arrays.asList("floatColumn", "longColumn"));
+        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(),
+            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+
+        // Check values, where no record should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertTrue(pqlRows.isEmpty());
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertTrue(sqlRows.isEmpty());
+      }
+      {
+        // Test selecting some columns with filter that does not match any record in one segment but matches some
+        // records in the other segment
+        String pqlQuery =
+            "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
+        String sqlQuery =
+            "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
+
+        // Check data schema
+        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(), Collections.singletonList("intColumn"));
+        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT});
+
+        // Check values, where only 5 top values sorted in int format ascending order should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertEquals(pqlRows.size(), 5);
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertEquals(sqlRows.size(), 5);
+        int[] expectedValues = new int[]{1000, 1001, 1002, 1003, 1004};
+        for (int i = 0; i < 5; i++) {
+          Serializable[] row = pqlRows.get(i);
+          assertEquals((int) row[0], expectedValues[i]);
+        }
+        for (int i = 0; i < 5; i++) {
+          Object[] row = sqlRows.get(i);
+          assertEquals((int) row[0], expectedValues[i]);
+        }
+      }
+      {
+        // Test electing some columns with filter that does not match any record in one server but matches some records
+        // in the other server
+        String pqlQuery =
+            "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
+        String sqlQuery =
+            "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
+
+        BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+        BrokerResponseNative pqlResponse = queryServersWithDifferentSegments(pqlBrokerRequest, segment0, segment1);
+        BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+        sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat", "sql"));
+        BrokerResponseNative sqlResponse = queryServersWithDifferentSegments(sqlBrokerRequest, segment0, segment1);
+
+        // Check data schema
+        SelectionResults selectionResults = pqlResponse.getSelectionResults();
+        assertNotNull(selectionResults);
+        assertEquals(selectionResults.getColumns(), Collections.singletonList("longColumn"));
+        ResultTable resultTable = sqlResponse.getResultTable();
+        assertNotNull(resultTable);
+        DataSchema dataSchema = resultTable.getDataSchema();
+        assertEquals(dataSchema.getColumnNames(), new String[]{"longColumn"});
+        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.LONG});
+
+        // Check values, where only 5 top values sorted in long format descending order should be returned
+        List<Serializable[]> pqlRows = selectionResults.getRows();
+        assertEquals(pqlRows.size(), 5);
+        List<Object[]> sqlRows = resultTable.getRows();
+        assertEquals(sqlRows.size(), 5);
+        int[] expectedValues = new int[]{99, 98, 97, 96, 95};
+        for (int i = 0; i < 5; i++) {
+          Serializable[] row = pqlRows.get(i);
+          assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
+        }
+        for (int i = 0; i < 5; i++) {
+          Object[] row = sqlRows.get(i);
+          assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
+        }
       }
     } finally {
-      destroySegments();
-    }
-  }
-
-  /**
-   * Helper for testing filter queries
-   * @param expectedTable expected result set
-   * @param query query to run
-   * @param columnNames name of columns
-   * @param types data types
-   */
-  private void runFilterQueryInnerSegment(final Set<Record> expectedTable, final String query, String[] columnNames,
-      DataSchema.ColumnDataType[] types) {
-    AggregationOperator aggregationOperator = getOperatorForQuery(query);
-    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    List<Object> operatorResult = resultsBlock.getAggregationResult();
-
-    Assert.assertNotNull(operatorResult);
-    Assert.assertEquals(operatorResult.size(), 1);
-    Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
-    DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
-    Assert.assertEquals(distinctTable.size(), expectedTable.size());
-
-    DataSchema dataSchema = distinctTable.getDataSchema();
-    Assert.assertEquals(dataSchema.getColumnNames(), columnNames);
-    Assert.assertEquals(dataSchema.getColumnDataTypes(), types);
-
-    Iterator<Record> iterator = distinctTable.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      Assert.assertEquals(record.getValues().length, columnNames.length);
-      Assert.assertTrue(expectedTable.contains(record));
-    }
-  }
-
-  private void runQueryInterSegmentWithOrderBy(String query, List<Record> orderedResults, String[] columnNames) {
-    BrokerResponseNative brokerResponseNative = getBrokerResponseForPqlQuery(query);
-    final SelectionResults selectionResults = brokerResponseNative.getSelectionResults();
-    Assert.assertEquals(selectionResults.getColumns(), Lists.newArrayList(columnNames));
-    List<Serializable[]> rows = selectionResults.getRows();
-    Assert.assertEquals(rows.size(), orderedResults.size());
-    int counter = 0;
-    for (Serializable[] row : rows) {
-      Assert.assertEquals(row.length, columnNames.length);
-      Record actualRecord = new Record(row);
-      Assert.assertEquals(actualRecord, orderedResults.get(counter++));
+      for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+        segmentDataManager.destroy();
+      }
     }
   }
 
   /**
-   * Create a segment with simple table of (State, City, SaleAmount, Time)
-   * @param q1ExpectedResults expected results of filter query 1
-   * @param q2ExpectedResults expected results of filter query 1
-   * @param q3ExpectedResults expected results of filter query 1
-   * @param q4ExpectedResults expected results of filter query 1
-   * @return list of generic rows
+   * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
+   * 2 copies of segment1.
    */
-  private List<GenericRow> createSimpleTable(final Set<Record> q1ExpectedResults, final Set<Record> q2ExpectedResults,
-      final Set<Record> q3ExpectedResults, final Set<Record> q4ExpectedResults) {
-    int numRows = 10;
-    List<GenericRow> rows = new ArrayList<>(numRows);
-    Object[] columns;
-
-    // ROW 1
-    GenericRow row = new GenericRow();
-    columns = new Object[]{"California", "San Mateo", 500000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    Record record = new Record(new Object[]{columns[0], columns[1]});
-    q1ExpectedResults.add(record);
-    q2ExpectedResults.add(record);
-    record = new Record(new Object[]{columns[0], columns[1], columns[2]});
-    q3ExpectedResults.add(record);
-    q4ExpectedResults.add(record);
-
-    // ROW 2
-    row = new GenericRow();
-    columns = new Object[]{"California", "San Mateo", 400000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    record = new Record(new Object[]{columns[0], columns[1], columns[2]});
-    q3ExpectedResults.add(record);
-    q4ExpectedResults.add(record);
-
-    // ROW 3
-    row = new GenericRow();
-    columns = new Object[]{"California", "Sunnyvale", 300000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    record = new Record(new Object[]{columns[0], columns[1]});
-    q1ExpectedResults.add(record);
-    record = new Record(new Object[]{columns[0], columns[1], columns[2]});
-    q3ExpectedResults.add(record);
-
-    // ROW 4
-    row = new GenericRow();
-    columns = new Object[]{"California", "Sunnyvale", 300000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 5
-    row = new GenericRow();
-    columns = new Object[]{"California", "Mountain View", 700000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    record = new Record(new Object[]{columns[0], columns[1]});
-    q1ExpectedResults.add(record);
-    q2ExpectedResults.add(record);
-    record = new Record(new Object[]{columns[0], columns[1], columns[2]});
-    q3ExpectedResults.add(record);
-    q4ExpectedResults.add(record);
-
-    // ROW 6
-    row = new GenericRow();
-    columns = new Object[]{"California", "Mountain View", 700000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 7
-    row = new GenericRow();
-    columns = new Object[]{"California", "Mountain View", 200000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    record = new Record(new Object[]{columns[0], columns[1], columns[2]});
-    q3ExpectedResults.add(record);
-
-    // ROW 8
-    row = new GenericRow();
-    columns = new Object[]{"Washington", "Seattle", 100000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 9
-    row = new GenericRow();
-    columns = new Object[]{"Washington", "Bellevue", 100000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 10
-    row = new GenericRow();
-    columns = new Object[]{"Oregon", "Portland", 50000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 11
-    row = new GenericRow();
-    columns = new Object[]{"Washington", "Bellevue", 150000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    // ROW 12
-    row = new GenericRow();
-    columns = new Object[]{"Oregon", "Portland", 100000};
-    row.putField("State", columns[0]);
-    row.putField("City", columns[1]);
-    row.putField("SaleAmount", columns[2]);
-    rows.add(row);
-
-    return rows;
-  }
-
-  private void destroySegments() {
-    for (IndexSegment indexSegment : _indexSegments) {
-      if (indexSegment != null) {
-        indexSegment.destroy();
-      }
-    }
-    _indexSegments.clear();
+  private BrokerResponseNative queryServersWithDifferentSegments(BrokerRequest brokerRequest, ImmutableSegment segment0,
+      ImmutableSegment segment1) {
+    List<SegmentDataManager> segmentDataManagers0 =
+        Arrays.asList(new ImmutableSegmentDataManager(segment0), new ImmutableSegmentDataManager(segment0));
+    List<SegmentDataManager> segmentDataManagers1 =
+        Arrays.asList(new ImmutableSegmentDataManager(segment1), new ImmutableSegmentDataManager(segment1));
+
+    // Server side
+    DataTable instanceResponse0 = PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers0, brokerRequest, EXECUTOR_SERVICE,
+        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+    DataTable instanceResponse1 = PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers1, brokerRequest, EXECUTOR_SERVICE,
+        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+
+    // Broker side
+    BrokerReduceService brokerReduceService = new BrokerReduceService();
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), instanceResponse0);
+    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), instanceResponse1);
+    return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap, null);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org