You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/11 16:57:14 UTC

incubator-gobblin git commit: [GOBBLIN-283] Refactor EnvelopePayloadConverter to support multi fields conversion

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a9c9f781f -> becb2b786


[GOBBLIN-283] Refactor EnvelopePayloadConverter to support multi fields conversion

Closes #2136 from zxcware/envref


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/becb2b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/becb2b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/becb2b78

Branch: refs/heads/master
Commit: becb2b78600cabba01ce2fb8d3d039dc283c458e
Parents: a9c9f78
Author: zhchen <zh...@linkedin.com>
Authored: Wed Oct 11 09:57:07 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 11 09:57:07 2017 -0700

----------------------------------------------------------------------
 .../converter/BaseEnvelopeSchemaConverter.java  | 50 +++++++++++++++++---
 .../converter/EnvelopePayloadConverter.java     | 44 +++++++++++------
 2 files changed, 73 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
index d220902..be59c2a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
@@ -36,10 +36,12 @@ import com.google.common.base.Optional;
  * Base class for an envelope schema converter using {@link KafkaSchemaRegistry}
  */
 public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, Schema, GenericRecord, GenericRecord> {
-  public static final String PAYLOAD_SCHEMA_ID_FIELD = "converter.envelopeSchemaConverter.schemaIdField";
-  public static final String PAYLOAD_FIELD = "converter.envelopeSchemaConverter.payloadField";
-  public static final String PAYLOAD_SCHEMA_TOPIC = "converter.envelopeSchemaConverter.payloadSchemaTopic";
-  public static final String KAFKA_REGISTRY_FACTORY = "converter.envelopeSchemaConverter.kafkaRegistryFactory";
+  public static final String CONF_PREFIX = "converter.envelopeSchemaConverter.";
+
+  public static final String PAYLOAD_SCHEMA_ID_FIELD = CONF_PREFIX + "schemaIdField";
+  public static final String PAYLOAD_FIELD =  CONF_PREFIX + "payloadField";
+  public static final String PAYLOAD_SCHEMA_TOPIC =  CONF_PREFIX + "payloadSchemaTopic";
+  public static final String KAFKA_REGISTRY_FACTORY =  CONF_PREFIX + "kafkaRegistryFactory";
 
   public static final String DEFAULT_PAYLOAD_FIELD = "payload";
   public static final String DEFAULT_PAYLOAD_SCHEMA_ID_FIELD = "payloadSchemaId";
@@ -81,12 +83,25 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S
    *
    * @param inputRecord the input record which has the payload
    * @return the current schema of the payload
+   * @deprecated use {@link #getFieldSchema(GenericRecord, String)}
    */
+  @Deprecated
   protected Schema getPayloadSchema(GenericRecord inputRecord)
       throws Exception {
-    Optional<Object> schemaIdValue = AvroUtils.getFieldValue(inputRecord, payloadSchemaIdField);
+    return getFieldSchema(inputRecord, payloadSchemaIdField);
+  }
+
+  /**
+   * Get the schema of a field
+   *
+   * @param record the input record which has the schema id
+   * @param schemaIdLocation a dot separated location string the schema id
+   * @return a schema referenced by the schema id
+   */
+  protected Schema getFieldSchema(GenericRecord record, String schemaIdLocation) throws Exception {
+    Optional<Object> schemaIdValue = AvroUtils.getFieldValue(record, schemaIdLocation);
     if (!schemaIdValue.isPresent()) {
-      throw new Exception("Schema id with key " + payloadSchemaIdField + " not found in the record");
+      throw new Exception("Schema id with key " + schemaIdLocation + " not found in the record");
     }
     String schemaKey = String.valueOf(schemaIdValue.get());
     return (Schema) registry.getSchemaByKey(schemaKey);
@@ -97,9 +112,30 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S
    *
    * @param inputRecord the input record which has the payload
    * @return the byte array of the payload in the input record
+   * @deprecated use {@link #getFieldAsBytes(GenericRecord, String)}
    */
+  @Deprecated
   protected byte[] getPayloadBytes(GenericRecord inputRecord) {
-    ByteBuffer bb = (ByteBuffer) inputRecord.get(payloadField);
+    try {
+      return getFieldAsBytes(inputRecord, payloadField);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  /**
+   * Get field value byte array
+   *
+   * @param record the input record which has the field
+   * @param fieldLocation a dot separated location string to the field
+   * @return the byte array of field value
+   */
+  protected byte[] getFieldAsBytes(GenericRecord record, String fieldLocation) throws Exception {
+    Optional<Object> bytesValue = AvroUtils.getFieldValue(record, fieldLocation);
+    if (!bytesValue.isPresent()) {
+      throw new Exception("Bytes value with key " + fieldLocation + " not found in the record");
+    }
+    ByteBuffer bb = (ByteBuffer) bytesValue.get();
     if (bb.hasArray()) {
       return bb.array();
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
index ca63ac8..6408a4c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -32,7 +32,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
  *
  * <p> Given an envelope schema as the input schema, the output schema will have the payload
  * field, configured by key {@value PAYLOAD_FIELD}, set with its latest schema fetched from a
- * {@link #registry} (see {@code createDecoratedField(Field)}). The converter copies the other fields
+ * {@link #registry} (see {@code createLatestPayloadField(Field)}). The converter copies the other fields
  * from the input schema to the output schema
  *
  * <p> Given an envelope record as the input record, the output record will have the payload set
@@ -51,13 +51,7 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
       throws SchemaConversionException {
     List<Field> outputSchemaFields = new ArrayList<>();
     for (Field field : inputSchema.getFields()) {
-      if (field.name().equals(payloadField)) {
-        // Decorate the field with full schema
-        outputSchemaFields.add(createDecoratedField(field));
-      } else {
-        // Make a copy of the field to the output schema
-        outputSchemaFields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()));
-      }
+      outputSchemaFields.add(convertFieldSchema(inputSchema, field, workUnit));
     }
 
     Schema outputSchema = Schema
@@ -67,12 +61,26 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
   }
 
   /**
+   * Convert to the output schema of a field
+   */
+  protected Field convertFieldSchema(Schema inputSchema, Field field, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    if (field.name().equals(payloadField)) {
+      // Create a payload field with latest schema
+      return createLatestPayloadField(field);
+    }
+    // Make a copy of the field to the output schema
+    return new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
+  }
+
+  /**
    * Create a payload field with its latest schema fetched from {@link #registry}
    *
    * @param field the original payload field from input envelope schema
    * @return a new payload field with its latest schema
    */
-  private Field createDecoratedField(Field field) throws SchemaConversionException {
+  private Field createLatestPayloadField(Field field)
+      throws SchemaConversionException {
     try {
       Schema payloadSchema = fetchLatestPayloadSchema();
       return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(), field.order());
@@ -86,12 +94,20 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
       throws DataConversionException {
     GenericRecord outputRecord = new GenericData.Record(outputSchema);
     for (Field field : inputRecord.getSchema().getFields()) {
-      if (field.name().equals(payloadField)) {
-        outputRecord.put(payloadField, upConvertPayload(inputRecord));
-      } else {
-        outputRecord.put(field.name(), inputRecord.get(field.name()));
-      }
+      outputRecord.put(field.name(), convertFieldValue(outputSchema, field, inputRecord, workUnit));
     }
     return new SingleRecordIterable<>(outputRecord);
   }
+
+  /**
+   * Convert to the output value of a field
+   */
+  protected Object convertFieldValue(Schema outputSchema, Field field, GenericRecord inputRecord,
+      WorkUnitState workUnit)
+      throws DataConversionException {
+    if (field.name().equals(payloadField)) {
+      return upConvertPayload(inputRecord);
+    }
+    return inputRecord.get(field.name());
+  }
 }