You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/19 01:24:21 UTC

[GitHub] fjy closed pull request #6642: SystemSchema: Fix data types for various fields.

fjy closed pull request #6642: SystemSchema: Fix data types for various fields.
URL: https://github.com/apache/incubator-druid/pull/6642
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index ae8ba9460fa..a004e80b810 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.inject.Inject;
 import org.apache.calcite.DataContext;
@@ -50,7 +51,6 @@
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
-import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@@ -68,6 +68,7 @@
 import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -88,9 +89,7 @@
   private static final String SERVER_SEGMENTS_TABLE = "server_segments";
   private static final String TASKS_TABLE = "tasks";
 
-  private static final EmittingLogger log = new EmittingLogger(SystemSchema.class);
-
-  private static final RowSignature SEGMENTS_SIGNATURE = RowSignature
+  static final RowSignature SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("segment_id", ValueType.STRING)
       .add("datasource", ValueType.STRING)
@@ -98,7 +97,7 @@
       .add("end", ValueType.STRING)
       .add("size", ValueType.LONG)
       .add("version", ValueType.STRING)
-      .add("partition_num", ValueType.STRING)
+      .add("partition_num", ValueType.LONG)
       .add("num_replicas", ValueType.LONG)
       .add("num_rows", ValueType.LONG)
       .add("is_published", ValueType.LONG)
@@ -107,25 +106,25 @@
       .add("payload", ValueType.STRING)
       .build();
 
-  private static final RowSignature SERVERS_SIGNATURE = RowSignature
+  static final RowSignature SERVERS_SIGNATURE = RowSignature
       .builder()
       .add("server", ValueType.STRING)
       .add("host", ValueType.STRING)
-      .add("plaintext_port", ValueType.STRING)
-      .add("tls_port", ValueType.STRING)
+      .add("plaintext_port", ValueType.LONG)
+      .add("tls_port", ValueType.LONG)
       .add("server_type", ValueType.STRING)
       .add("tier", ValueType.STRING)
       .add("curr_size", ValueType.LONG)
       .add("max_size", ValueType.LONG)
       .build();
 
-  private static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
+  static final RowSignature SERVER_SEGMENTS_SIGNATURE = RowSignature
       .builder()
       .add("server", ValueType.STRING)
       .add("segment_id", ValueType.STRING)
       .build();
 
-  private static final RowSignature TASKS_SIGNATURE = RowSignature
+  static final RowSignature TASKS_SIGNATURE = RowSignature
       .builder()
       .add("task_id", ValueType.STRING)
       .add("type", ValueType.STRING)
@@ -137,8 +136,8 @@
       .add("duration", ValueType.LONG)
       .add("location", ValueType.STRING)
       .add("host", ValueType.STRING)
-      .add("plaintext_port", ValueType.STRING)
-      .add("tls_port", ValueType.STRING)
+      .add("plaintext_port", ValueType.LONG)
+      .add("tls_port", ValueType.LONG)
       .add("error_msg", ValueType.STRING)
       .build();
 
@@ -156,12 +155,24 @@ public SystemSchema(
   {
     Preconditions.checkNotNull(serverView, "serverView");
     BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler();
-    this.tableMap = ImmutableMap.of(
-        SEGMENTS_TABLE, new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper),
-        SERVERS_TABLE, new ServersTable(serverView, authorizerMapper),
-        SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper),
-        TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
-    );
+    this.tableMap = ImmutableMap.<String, Table>builder()
+        .put(
+            SEGMENTS_TABLE,
+            new SegmentsTable(druidSchema, coordinatorDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
+        )
+        .put(
+            SERVERS_TABLE,
+            new ServersTable(serverView, authorizerMapper)
+        )
+        .put(
+            SERVER_SEGMENTS_TABLE,
+            new ServerSegmentsTable(serverView, authorizerMapper)
+        )
+        .put(
+            TASKS_TABLE,
+            new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper)
+        )
+        .build();
   }
 
   @Override
@@ -248,11 +259,11 @@ public TableType getJdbcTableType()
               return new Object[]{
                   val.getIdentifier(),
                   val.getDataSource(),
-                  val.getInterval().getStart(),
-                  val.getInterval().getEnd(),
+                  val.getInterval().getStart().toString(),
+                  val.getInterval().getEnd().toString(),
                   val.getSize(),
                   val.getVersion(),
-                  val.getShardSpec().getPartitionNum(),
+                  Long.valueOf(val.getShardSpec().getPartitionNum()),
                   numReplicas,
                   numRows,
                   1L, //is_published is true for published segments
@@ -276,16 +287,17 @@ public TableType getJdbcTableType()
               if (segmentsAlreadySeen.contains(val.getKey().getIdentifier())) {
                 return null;
               }
+              final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getKey().getIdentifier());
+              final long numReplicas = partialSegmentData == null ? 0L : partialSegmentData.getNumReplicas();
               return new Object[]{
                   val.getKey().getIdentifier(),
                   val.getKey().getDataSource(),
-                  val.getKey().getInterval().getStart(),
-                  val.getKey().getInterval().getEnd(),
+                  val.getKey().getInterval().getStart().toString(),
+                  val.getKey().getInterval().getEnd().toString(),
                   val.getKey().getSize(),
                   val.getKey().getVersion(),
-                  val.getKey().getShardSpec().getPartitionNum(),
-                  partialSegmentDataMap.get(val.getKey().getIdentifier()) == null ? 0L
-                    : partialSegmentDataMap.get(val.getKey().getIdentifier()).getNumReplicas(),
+                  Long.valueOf(val.getKey().getShardSpec().getPartitionNum()),
+                  numReplicas,
                   val.getValue().getNumRows(),
                   val.getValue().isPublished(),
                   val.getValue().isAvailable(),
@@ -471,10 +483,10 @@ public TableType getJdbcTableType()
           .from(druidServers)
           .transform(val -> new Object[]{
               val.getHost(),
-              val.getHost().split(":")[0],
-              val.getHostAndPort() == null ? -1 : val.getHostAndPort().split(":")[1],
-              val.getHostAndTlsPort() == null ? -1 : val.getHostAndTlsPort().split(":")[1],
-              val.getType(),
+              extractHost(val.getHost()),
+              (long) extractPort(val.getHostAndPort()),
+              (long) extractPort(val.getHostAndTlsPort()),
+              toStringOrNull(val.getType()),
               val.getTier(),
               val.getCurrSize(),
               val.getMaxSize()
@@ -583,24 +595,33 @@ public TasksEnumerable(JsonParserIterator<TaskStatusPlus> tasks)
             @Override
             public Object[] current()
             {
-              TaskStatusPlus task = it.next();
-              return new Object[]{task.getId(),
-                                  task.getType(),
-                                  task.getDataSource(),
-                                  task.getCreatedTime(),
-                                  task.getQueueInsertionTime(),
-                                  task.getStatusCode(),
-                                  task.getRunnerStatusCode(),
-                                  task.getDuration(),
-                                  task.getLocation().getHost() + ":" + (task.getLocation().getTlsPort()
-                                                                          == -1
-                                                                          ? task.getLocation()
-                                                                                .getPort()
-                                                                          : task.getLocation().getTlsPort()),
-                                  task.getLocation().getHost(),
-                                  task.getLocation().getPort(),
-                                  task.getLocation().getTlsPort(),
-                                  task.getErrorMsg()};
+              final TaskStatusPlus task = it.next();
+              final String hostAndPort;
+
+              if (task.getLocation().getHost() == null) {
+                hostAndPort = null;
+              } else {
+                final int port = task.getLocation().getTlsPort() >= 0
+                                 ? task.getLocation().getTlsPort()
+                                 : task.getLocation().getPort();
+
+                hostAndPort = HostAndPort.fromParts(task.getLocation().getHost(), port).toString();
+              }
+              return new Object[]{
+                  task.getId(),
+                  task.getType(),
+                  task.getDataSource(),
+                  toStringOrNull(task.getCreatedTime()),
+                  toStringOrNull(task.getQueueInsertionTime()),
+                  toStringOrNull(task.getStatusCode()),
+                  toStringOrNull(task.getRunnerStatusCode()),
+                  task.getDuration() == null ? 0L : task.getDuration(),
+                  hostAndPort,
+                  task.getLocation().getHost(),
+                  (long) task.getLocation().getPort(),
+                  (long) task.getLocation().getTlsPort(),
+                  task.getErrorMsg()
+              };
             }
 
             @Override
@@ -632,7 +653,10 @@ public void close()
       return new TasksEnumerable(getTasks(druidLeaderClient, jsonMapper, responseHandler));
     }
 
-    private CloseableIterator<TaskStatusPlus> getAuthorizedTasks(JsonParserIterator<TaskStatusPlus> it, DataContext root)
+    private CloseableIterator<TaskStatusPlus> getAuthorizedTasks(
+        JsonParserIterator<TaskStatusPlus> it,
+        DataContext root
+    )
     {
       final AuthenticationResult authenticationResult =
           (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
@@ -723,4 +747,32 @@ public void close() throws IOException
     };
   }
 
+  @Nullable
+  private static String extractHost(@Nullable final String hostAndPort)
+  {
+    if (hostAndPort == null) {
+      return null;
+    }
+
+    return HostAndPort.fromString(hostAndPort).getHostText();
+  }
+
+  private static int extractPort(@Nullable final String hostAndPort)
+  {
+    if (hostAndPort == null) {
+      return -1;
+    }
+
+    return HostAndPort.fromString(hostAndPort).getPortOrDefault(-1);
+  }
+
+  @Nullable
+  private static String toStringOrNull(@Nullable final Object object)
+  {
+    if (object == null) {
+      return null;
+    }
+
+    return object.toString();
+  }
 }
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 3537ad62bef..851adff9454 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -26,8 +26,6 @@
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
 import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -39,8 +37,10 @@
 import org.apache.druid.client.TimelineServerView;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.discovery.DruidLeaderClient;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.http.client.Request;
 import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
@@ -54,6 +54,7 @@
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordination.DruidServerMetadata;
@@ -64,6 +65,7 @@
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.NoopEscalator;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.table.RowSignature;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
@@ -147,10 +149,10 @@ public void setUp() throws Exception
     responseHandler = EasyMock.createMockBuilder(BytesAccumulatingResponseHandler.class)
                               .withConstructor()
                               .addMockedMethod(
-                                      "handleResponse",
-                                      HttpResponse.class,
-                                      HttpResponseHandler.TrafficCop.class
-                                  )
+                                  "handleResponse",
+                                  HttpResponse.class,
+                                  HttpResponseHandler.TrafficCop.class
+                              )
                               .addMockedMethod("getStatus")
                               .createMock();
     request = EasyMock.createMock(Request.class);
@@ -441,36 +443,34 @@ public Object get(String name)
         return CalciteTests.SUPER_USER_AUTH_RESULT;
       }
     };
-    Enumerable<Object[]> rows = segmentsTable.scan(dataContext);
-    Enumerator<Object[]> enumerator = rows.enumerator();
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row1 = enumerator.current();
-    //segment 6 is published and unavailable, num_replicas is 0
-    Assert.assertEquals(1L, row1[9]);
-    Assert.assertEquals(0L, row1[7]);
-    Assert.assertEquals(0L, row1[8]); //numRows = 0
+    final List<Object[]> rows = segmentsTable.scan(dataContext).toList();
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Assert.assertEquals(true, enumerator.moveNext());
-    Assert.assertEquals(true, enumerator.moveNext());
-    Assert.assertEquals(true, enumerator.moveNext());
+    Assert.assertEquals(6, rows.size());
+
+    Object[] row0 = rows.get(0);
+    //segment 6 is published and unavailable, num_replicas is 0
+    Assert.assertEquals(1L, row0[9]);
+    Assert.assertEquals(0L, row0[7]);
+    Assert.assertEquals(0L, row0[8]); //numRows = 0
 
-    Object[] row5 = enumerator.current();
+    Object[] row4 = rows.get(4);
     //segment 2 is published and has 2 replicas
-    Assert.assertEquals(1L, row5[9]);
-    Assert.assertEquals(2L, row5[7]);
-    Assert.assertEquals(3L, row5[8]);  //numRows = 3
-    Assert.assertEquals(true, enumerator.moveNext());
-    Assert.assertEquals(false, enumerator.moveNext());
+    Assert.assertEquals(1L, row4[9]);
+    Assert.assertEquals(2L, row4[7]);
+    Assert.assertEquals(3L, row4[8]);  //numRows = 3
 
+    // Verify value types.
+    verifyTypes(rows, SystemSchema.SEGMENTS_SIGNATURE);
   }
 
   @Test
   public void testServersTable()
   {
 
-    SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class).withConstructor(serverView, authMapper).createMock();
+    SystemSchema.ServersTable serversTable = EasyMock.createMockBuilder(SystemSchema.ServersTable.class)
+                                                     .withConstructor(serverView, authMapper)
+                                                     .createMock();
     EasyMock.replay(serversTable);
 
     EasyMock.expect(serverView.getDruidServers())
@@ -503,14 +503,17 @@ public Object get(String name)
         return CalciteTests.SUPER_USER_AUTH_RESULT;
       }
     };
-    Enumerable<Object[]> rows = serversTable.scan(dataContext);
-    Assert.assertEquals(2, rows.count());
-    Object[] row1 = rows.first();
+    final List<Object[]> rows = serversTable.scan(dataContext).toList();
+    Assert.assertEquals(2, rows.size());
+    Object[] row1 = rows.get(0);
     Assert.assertEquals("localhost:0000", row1[0]);
     Assert.assertEquals("realtime", row1[4].toString());
-    Object[] row2 = rows.last();
+    Object[] row2 = rows.get(1);
     Assert.assertEquals("server2:1234", row2[0]);
     Assert.assertEquals("historical", row2[4].toString());
+
+    // Verify value types.
+    verifyTypes(rows, SystemSchema.SERVERS_SIGNATURE);
   }
 
   @Test
@@ -550,8 +553,6 @@ public Object get(String name)
         return CalciteTests.SUPER_USER_AUTH_RESULT;
       }
     };
-    Enumerable<Object[]> rows = serverSegmentsTable.scan(dataContext);
-    Assert.assertEquals(5, rows.count());
 
     //server_segments table is the join of servers and segments table
     // it will have 5 rows as follows
@@ -561,34 +562,31 @@ public Object get(String name)
     // server2:1234   |  test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4(segment4)
     // server2:1234   |  test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5(segment5)
 
-    Enumerator<Object[]> enumerator = rows.enumerator();
-    Assert.assertEquals(true, enumerator.moveNext());
+    final List<Object[]> rows = serverSegmentsTable.scan(dataContext).toList();
+    Assert.assertEquals(5, rows.size());
+
+    Object[] row0 = rows.get(0);
+    Assert.assertEquals("localhost:0000", row0[0]);
+    Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row0[1]);
 
-    Object[] row1 = rows.first();
+    Object[] row1 = rows.get(1);
     Assert.assertEquals("localhost:0000", row1[0]);
-    Assert.assertEquals("test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1", row1[1]);
+    Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row1[1]);
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row2 = enumerator.current();
-    Assert.assertEquals("localhost:0000", row2[0]);
-    Assert.assertEquals("test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2", row2[1]);
+    Object[] row2 = rows.get(2);
+    Assert.assertEquals("server2:1234", row2[0]);
+    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row2[1]);
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row3 = enumerator.current();
+    Object[] row3 = rows.get(3);
     Assert.assertEquals("server2:1234", row3[0]);
-    Assert.assertEquals("test3_2012-01-01T00:00:00.000Z_2013-01-01T00:00:00.000Z_version3", row3[1]);
+    Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1]);
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row4 = enumerator.current();
+    Object[] row4 = rows.get(4);
     Assert.assertEquals("server2:1234", row4[0]);
-    Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row4[1]);
+    Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1]);
 
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row5 = rows.last();
-    Assert.assertEquals("server2:1234", row5[0]);
-    Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row5[1]);
-
-    Assert.assertEquals(false, enumerator.moveNext());
+    // Verify value types.
+    verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE);
   }
 
   @Test
@@ -672,26 +670,78 @@ public Object get(String name)
         return CalciteTests.SUPER_USER_AUTH_RESULT;
       }
     };
-    Enumerable<Object[]> rows = tasksTable.scan(dataContext);
-    Enumerator<Object[]> enumerator = rows.enumerator();
-
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row1 = enumerator.current();
-    Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row1[0].toString());
-    Assert.assertEquals("FAILED", row1[5].toString());
-    Assert.assertEquals("NONE", row1[6].toString());
-    Assert.assertEquals(-1L, row1[7]);
-    Assert.assertEquals("testHost:1234", row1[8]);
-
-    Assert.assertEquals(true, enumerator.moveNext());
-    Object[] row2 = enumerator.current();
-    Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row2[0].toString());
-    Assert.assertEquals("RUNNING", row2[5].toString());
-    Assert.assertEquals("RUNNING", row2[6].toString());
-    Assert.assertEquals(null, row2[7]);
-    Assert.assertEquals("192.168.1.6:8100", row2[8]);
-
-    Assert.assertEquals(false, enumerator.moveNext());
+    final List<Object[]> rows = tasksTable.scan(dataContext).toList();
+
+    Object[] row0 = rows.get(0);
+    Assert.assertEquals("index_wikipedia_2018-09-20T22:33:44.911Z", row0[0].toString());
+    Assert.assertEquals("FAILED", row0[5].toString());
+    Assert.assertEquals("NONE", row0[6].toString());
+    Assert.assertEquals(-1L, row0[7]);
+    Assert.assertEquals("testHost:1234", row0[8]);
+
+    Object[] row1 = rows.get(1);
+    Assert.assertEquals("index_wikipedia_2018-09-21T18:38:47.773Z", row1[0].toString());
+    Assert.assertEquals("RUNNING", row1[5].toString());
+    Assert.assertEquals("RUNNING", row1[6].toString());
+    Assert.assertEquals(0L, row1[7]);
+    Assert.assertEquals("192.168.1.6:8100", row1[8]);
+
+    // Verify value types.
+    verifyTypes(rows, SystemSchema.TASKS_SIGNATURE);
   }
 
+  private static void verifyTypes(final List<Object[]> rows, final RowSignature signature)
+  {
+    final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl());
+
+    for (Object[] row : rows) {
+      Assert.assertEquals(row.length, signature.getRowOrder().size());
+
+      for (int i = 0; i < row.length; i++) {
+        final Class<?> expectedClass;
+
+        final ValueType columnType = signature.getColumnType(signature.getRowOrder().get(i));
+        final boolean nullable = rowType.getFieldList().get(i).getType().isNullable();
+
+        switch (columnType) {
+          case LONG:
+            expectedClass = Long.class;
+            break;
+          case FLOAT:
+            expectedClass = Float.class;
+            break;
+          case DOUBLE:
+            expectedClass = Double.class;
+            break;
+          case STRING:
+            expectedClass = String.class;
+            break;
+          default:
+            throw new IAE("Don't know what class to expect for valueType[%s]", columnType);
+        }
+
+        if (nullable) {
+          Assert.assertTrue(
+              StringUtils.format(
+                  "Column[%s] is a [%s] or null (was %s)",
+                  signature.getRowOrder().get(i),
+                  expectedClass.getName(),
+                  row[i] == null ? null : row[i].getClass().getName()
+              ),
+              row[i] == null || expectedClass.isAssignableFrom(row[i].getClass())
+          );
+        } else {
+          Assert.assertTrue(
+              StringUtils.format(
+                  "Column[%s] is a [%s] (was %s)",
+                  signature.getRowOrder().get(i),
+                  expectedClass.getName(),
+                  row[i] == null ? null : row[i].getClass().getName()
+              ),
+              row[i] != null && expectedClass.isAssignableFrom(row[i].getClass())
+          );
+        }
+      }
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org