You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/18 08:40:07 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #13771: [BEAM-11648] First step in creation of Vortex sink

chamikaramj commented on a change in pull request #13771:
URL: https://github.com/apache/beam/pull/13771#discussion_r578215398



##########
File path: sdks/java/extensions/google-cloud-platform-core/build.gradle
##########
@@ -17,6 +17,7 @@
  */
 
 import groovy.json.JsonOutput
+import org.apache.beam.gradle.GrpcVendoring_1_26_0

Review comment:
       Is this used ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##########
@@ -411,6 +411,21 @@ public static TableReference parseTableSpec(String tableSpec) {
     return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
   }
 
+  public static TableReference parseTableUrn(String tableUrn) {
+    Matcher match = BigQueryIO.TABLE_URN_SPEC.matcher(tableUrn);
+    if (!match.matches()) {
+      throw new IllegalArgumentException(
+          "Table reference is not in projects/[project_id]/datasets/[dataset_id]/tables/[table_id] "

Review comment:
       Why would this be in this format (and not "project:dataset.table") ?
   
   Probably good to link to some documentation that points to the convention used here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
##########
@@ -411,6 +411,21 @@ public static TableReference parseTableSpec(String tableSpec) {
     return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
   }
 
+  public static TableReference parseTableUrn(String tableUrn) {

Review comment:
       I couldn't find any usages for this here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -74,6 +88,17 @@
 })
 public class BigQueryUtils {
 
+  public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)

Review comment:
       How will the Descriptor be used ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.EncryptionConfiguration;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+
+public class CreateTableHelpers {
+  /**
+   * The list of tables created so far, so we don't try the creation each time.
+   *
+   * <p>TODO: We should put a bound on memory usage of this. Use guava cache instead.
+   */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  static TableDestination possiblyCreateTable(
+      DoFn<?, ?>.ProcessContext context,
+      TableDestination tableDestination,
+      Supplier<TableSchema> schemaSupplier,
+      CreateDisposition createDisposition,
+      Coder<?> tableDestinationCoder,
+      String kmsKey,
+      BigQueryServices bqServices) {
+    checkArgument(
+        tableDestination.getTableSpec() != null,
+        "DynamicDestinations.getTable() must return a TableDestination "
+            + "with a non-null table spec, but %s returned %s for destination %s,"
+            + "which has a null table spec",
+        tableDestination);
+    boolean destinationCoderSupportsClustering =
+        !(tableDestinationCoder instanceof TableDestinationCoderV2);
+    checkArgument(
+        tableDestination.getClustering() == null || destinationCoderSupportsClustering,
+        "DynamicDestinations.getTable() may only return destinations with clustering configured"
+            + " if a destination coder is supplied that supports clustering, but %s is configured"
+            + " to use TableDestinationCoderV2. Set withClustering() on BigQueryIO.write() and, "
+            + " if you provided a custom DynamicDestinations instance, override"
+            + " getDestinationCoder() to return TableDestinationCoderV3.");
+    TableReference tableReference = tableDestination.getTableReference().clone();
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      tableReference.setProjectId(
+          context.getPipelineOptions().as(BigQueryOptions.class).getProject());
+      tableDestination = tableDestination.withTableReference(tableReference);
+    }
+    if (createDisposition == CreateDisposition.CREATE_NEVER) {
+      return tableDestination;
+    }
+
+    String tableSpec = BigQueryHelpers.stripPartitionDecorator(tableDestination.getTableSpec());
+    if (!createdTables.contains(tableSpec)) {
+      // Another thread may have succeeded in creating the table in the meanwhile, so
+      // check again. This check isn't needed for correctness, but we add it to prevent
+      // every thread from attempting a create and overwhelming our BigQuery quota.
+      synchronized (createdTables) {
+        if (!createdTables.contains(tableSpec)) {
+          tryCreateTable(
+              context,
+              schemaSupplier,
+              tableDestination,
+              createDisposition,
+              tableSpec,
+              kmsKey,
+              bqServices);
+        }
+      }
+    }
+    return tableDestination;
+  }
+
+  @SuppressWarnings({"nullness"})
+  private static void tryCreateTable(

Review comment:
       Ditto.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+    }
+    return descriptorBuilder.build();
+  }
+
+  public static void fieldDescriptorFromTableField(
+      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING);
+        break;
+      case "BYTES":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
+        break;
+      case "INT64":
+      case "INTEGER":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "FLOAT64":
+      case "FLOAT":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
+        break;
+      case "BOOL":
+      case "BOOLEAN":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
+        break;
+      case "TIMESTAMP":
+      case "TIME":
+      case "DATETIME":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "DATE":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT32);
+        break;
+      case "STRUCT":
+      case "RECORD":
+        DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+        descriptorBuilder.addNestedType(nested);
+        fieldDescriptorBuilder =
+            fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported");
+    }
+
+    Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    descriptorBuilder.addField(fieldDescriptorBuilder.build());
+  }
+
+  public static DynamicMessage messageFromTableRow(Descriptor descriptor, TableRow tableRow) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
+      Object value =
+          messageValueFromFieldValue(fieldDescriptor, tableRow.get(fieldDescriptor.getName()));
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  public static Object messageValueFromFieldValue(FieldDescriptor fieldDescriptor, Object bqValue) {
+    if (bqValue == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, bqValue);
+  }
+
+  private static final Map<FieldDescriptor.Type, Function<String, Object>> JSON_PROTO_PARSERS =
+      ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+          .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+          .put(FieldDescriptor.Type.INT64, Long::valueOf)
+          .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+          .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+          .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+          .put(FieldDescriptor.Type.STRING, str -> str)
+          .put(
+              FieldDescriptor.Type.BYTES,
+              b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+          .build();
+
+  private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
+    if (jsonBQValue instanceof String) {
+      Function<String, Object> mapper = JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
+      if (mapper != null) {
+        return mapper.apply((String) jsonBQValue);
+      }
+    } else if (jsonBQValue instanceof Integer) {
+      switch (fieldDescriptor.getJavaType()) {
+        case INT:
+          return Integer.valueOf((int) jsonBQValue);
+        case LONG:
+          return Long.valueOf((int) jsonBQValue);
+        default:
+          throw new RuntimeException("foo");
+      }
+    } else if (jsonBQValue instanceof List) {
+      return ((List<Object>) jsonBQValue)
+          .stream()
+              .map(v -> ((Map<String, Object>) v).get("v"))
+              .map(v -> toProtoValue(fieldDescriptor, v))
+              .collect(toList());
+    } else if (jsonBQValue instanceof AbstractMap) {
+      // This will handle nested rows.
+      TableRow tr = new TableRow();
+      tr.putAll((AbstractMap<String, Object>) jsonBQValue);
+      return messageFromTableRow(fieldDescriptor.getMessageType(), tr);
+    } else if (jsonBQValue instanceof TableRow) {

Review comment:
       Did you expect to fall through here ? (if so probably add a comment)

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -74,6 +88,17 @@
 })
 public class BigQueryUtils {
 
+  public static Descriptor getDescriptorFromTableSchema(TableSchema jsonSchema)

Review comment:
       Probably we should add some comments to document the API of the public util functions here.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.api.services.bigquery.model.EncryptionConfiguration;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+
+public class CreateTableHelpers {
+  /**
+   * The list of tables created so far, so we don't try the creation each time.
+   *
+   * <p>TODO: We should put a bound on memory usage of this. Use guava cache instead.
+   */
+  private static Set<String> createdTables =
+      Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+  static TableDestination possiblyCreateTable(

Review comment:
       This seems like mostly refactoring, right ?
   Lemme know if there are subtle changes that should be reviewed closely.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));

Review comment:
       Probably add a comment on why the replacement is needed.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
##########
@@ -405,6 +430,185 @@ public static Schema fromTableSchema(TableSchema tableSchema, SchemaConversionOp
     return fromTableFieldSchema(tableSchema.getFields(), options);
   }
 
+  public static DescriptorProto descriptorSchemaFromTableSchema(TableSchema tableSchema) {
+    return descriptorSchemaFromTableFieldSchemas(tableSchema.getFields());
+  }
+
+  public static DescriptorProto descriptorSchemaFromTableFieldSchemas(
+      Iterable<TableFieldSchema> tableFieldSchemas) {
+    DescriptorProto.Builder descriptorBuilder = DescriptorProto.newBuilder();
+    descriptorBuilder.setName("D" + UUID.randomUUID().toString().replace("-", "_"));
+    int i = 1;
+    for (TableFieldSchema fieldSchema : tableFieldSchemas) {
+      fieldDescriptorFromTableField(fieldSchema, i++, descriptorBuilder);
+    }
+    return descriptorBuilder.build();
+  }
+
+  public static void fieldDescriptorFromTableField(
+      TableFieldSchema fieldSchema, int fieldNumber, DescriptorProto.Builder descriptorBuilder) {
+    FieldDescriptorProto.Builder fieldDescriptorBuilder = FieldDescriptorProto.newBuilder();
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setName(fieldSchema.getName());
+    fieldDescriptorBuilder = fieldDescriptorBuilder.setNumber(fieldNumber);
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_STRING);
+        break;
+      case "BYTES":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BYTES);
+        break;
+      case "INT64":
+      case "INTEGER":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "FLOAT64":
+      case "FLOAT":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_FLOAT);
+        break;
+      case "BOOL":
+      case "BOOLEAN":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_BOOL);
+        break;
+      case "TIMESTAMP":
+      case "TIME":
+      case "DATETIME":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT64);
+        break;
+      case "DATE":
+        fieldDescriptorBuilder = fieldDescriptorBuilder.setType(Type.TYPE_INT32);
+        break;
+      case "STRUCT":
+      case "RECORD":
+        DescriptorProto nested = descriptorSchemaFromTableFieldSchemas(fieldSchema.getFields());
+        descriptorBuilder.addNestedType(nested);
+        fieldDescriptorBuilder =
+            fieldDescriptorBuilder.setType(Type.TYPE_MESSAGE).setTypeName(nested.getName());
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Converting BigQuery type " + fieldSchema.getType() + " to Beam type is unsupported");
+    }
+
+    Optional<Mode> fieldMode = Optional.ofNullable(fieldSchema.getMode()).map(Mode::valueOf);
+    if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REPEATED);
+    } else if (!fieldMode.isPresent() || fieldMode.filter(m -> m == Mode.NULLABLE).isPresent()) {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_OPTIONAL);
+    } else {
+      fieldDescriptorBuilder = fieldDescriptorBuilder.setLabel(Label.LABEL_REQUIRED);
+    }
+    descriptorBuilder.addField(fieldDescriptorBuilder.build());
+  }
+
+  public static DynamicMessage messageFromTableRow(Descriptor descriptor, TableRow tableRow) {
+    DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
+    for (FieldDescriptor fieldDescriptor : descriptor.getFields()) {
+      Object value =
+          messageValueFromFieldValue(fieldDescriptor, tableRow.get(fieldDescriptor.getName()));
+      if (value != null) {
+        builder.setField(fieldDescriptor, value);
+      }
+    }
+    return builder.build();
+  }
+
+  public static Object messageValueFromFieldValue(FieldDescriptor fieldDescriptor, Object bqValue) {
+    if (bqValue == null) {
+      if (fieldDescriptor.isOptional()) {
+        return null;
+      } else {
+        throw new IllegalArgumentException(
+            "Received null value for non-nullable field " + fieldDescriptor.getName());
+      }
+    }
+    return toProtoValue(fieldDescriptor, bqValue);
+  }
+
+  private static final Map<FieldDescriptor.Type, Function<String, Object>> JSON_PROTO_PARSERS =
+      ImmutableMap.<FieldDescriptor.Type, Function<String, Object>>builder()
+          .put(FieldDescriptor.Type.INT32, Integer::valueOf)
+          .put(FieldDescriptor.Type.INT64, Long::valueOf)
+          .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
+          .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
+          .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
+          .put(FieldDescriptor.Type.STRING, str -> str)
+          .put(
+              FieldDescriptor.Type.BYTES,
+              b64 -> ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
+          .build();
+
+  private static Object toProtoValue(FieldDescriptor fieldDescriptor, Object jsonBQValue) {
+    if (jsonBQValue instanceof String) {
+      Function<String, Object> mapper = JSON_PROTO_PARSERS.get(fieldDescriptor.getType());
+      if (mapper != null) {
+        return mapper.apply((String) jsonBQValue);
+      }
+    } else if (jsonBQValue instanceof Integer) {
+      switch (fieldDescriptor.getJavaType()) {
+        case INT:
+          return Integer.valueOf((int) jsonBQValue);
+        case LONG:
+          return Long.valueOf((int) jsonBQValue);
+        default:
+          throw new RuntimeException("foo");

Review comment:
       Update the comment ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org