You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/10/11 16:57:14 UTC
incubator-gobblin git commit: [GOBBLIN-283] Refactor
EnvelopePayloadConverter to support multi fields conversion
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a9c9f781f -> becb2b786
[GOBBLIN-283] Refactor EnvelopePayloadConverter to support multi fields conversion
Closes #2136 from zxcware/envref
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/becb2b78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/becb2b78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/becb2b78
Branch: refs/heads/master
Commit: becb2b78600cabba01ce2fb8d3d039dc283c458e
Parents: a9c9f78
Author: zhchen <zh...@linkedin.com>
Authored: Wed Oct 11 09:57:07 2017 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Oct 11 09:57:07 2017 -0700
----------------------------------------------------------------------
.../converter/BaseEnvelopeSchemaConverter.java | 50 +++++++++++++++++---
.../converter/EnvelopePayloadConverter.java | 44 +++++++++++------
2 files changed, 73 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
index d220902..be59c2a 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/BaseEnvelopeSchemaConverter.java
@@ -36,10 +36,12 @@ import com.google.common.base.Optional;
* Base class for an envelope schema converter using {@link KafkaSchemaRegistry}
*/
public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, Schema, GenericRecord, GenericRecord> {
- public static final String PAYLOAD_SCHEMA_ID_FIELD = "converter.envelopeSchemaConverter.schemaIdField";
- public static final String PAYLOAD_FIELD = "converter.envelopeSchemaConverter.payloadField";
- public static final String PAYLOAD_SCHEMA_TOPIC = "converter.envelopeSchemaConverter.payloadSchemaTopic";
- public static final String KAFKA_REGISTRY_FACTORY = "converter.envelopeSchemaConverter.kafkaRegistryFactory";
+ public static final String CONF_PREFIX = "converter.envelopeSchemaConverter.";
+
+ public static final String PAYLOAD_SCHEMA_ID_FIELD = CONF_PREFIX + "schemaIdField";
+ public static final String PAYLOAD_FIELD = CONF_PREFIX + "payloadField";
+ public static final String PAYLOAD_SCHEMA_TOPIC = CONF_PREFIX + "payloadSchemaTopic";
+ public static final String KAFKA_REGISTRY_FACTORY = CONF_PREFIX + "kafkaRegistryFactory";
public static final String DEFAULT_PAYLOAD_FIELD = "payload";
public static final String DEFAULT_PAYLOAD_SCHEMA_ID_FIELD = "payloadSchemaId";
@@ -81,12 +83,25 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S
*
* @param inputRecord the input record which has the payload
* @return the current schema of the payload
+ * @deprecated use {@link #getFieldSchema(GenericRecord, String)}
*/
+ @Deprecated
protected Schema getPayloadSchema(GenericRecord inputRecord)
throws Exception {
- Optional<Object> schemaIdValue = AvroUtils.getFieldValue(inputRecord, payloadSchemaIdField);
+ return getFieldSchema(inputRecord, payloadSchemaIdField);
+ }
+
+ /**
+ * Get the schema of a field
+ *
+ * @param record the input record which has the schema id
+ * @param schemaIdLocation a dot separated location string the schema id
+ * @return a schema referenced by the schema id
+ */
+ protected Schema getFieldSchema(GenericRecord record, String schemaIdLocation) throws Exception {
+ Optional<Object> schemaIdValue = AvroUtils.getFieldValue(record, schemaIdLocation);
if (!schemaIdValue.isPresent()) {
- throw new Exception("Schema id with key " + payloadSchemaIdField + " not found in the record");
+ throw new Exception("Schema id with key " + schemaIdLocation + " not found in the record");
}
String schemaKey = String.valueOf(schemaIdValue.get());
return (Schema) registry.getSchemaByKey(schemaKey);
@@ -97,9 +112,30 @@ public abstract class BaseEnvelopeSchemaConverter<P> extends Converter<Schema, S
*
* @param inputRecord the input record which has the payload
* @return the byte array of the payload in the input record
+ * @deprecated use {@link #getFieldAsBytes(GenericRecord, String)}
*/
+ @Deprecated
protected byte[] getPayloadBytes(GenericRecord inputRecord) {
- ByteBuffer bb = (ByteBuffer) inputRecord.get(payloadField);
+ try {
+ return getFieldAsBytes(inputRecord, payloadField);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * Get field value byte array
+ *
+ * @param record the input record which has the field
+ * @param fieldLocation a dot separated location string to the field
+ * @return the byte array of field value
+ */
+ protected byte[] getFieldAsBytes(GenericRecord record, String fieldLocation) throws Exception {
+ Optional<Object> bytesValue = AvroUtils.getFieldValue(record, fieldLocation);
+ if (!bytesValue.isPresent()) {
+ throw new Exception("Bytes value with key " + fieldLocation + " not found in the record");
+ }
+ ByteBuffer bb = (ByteBuffer) bytesValue.get();
if (bb.hasArray()) {
return bb.array();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/becb2b78/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
index ca63ac8..6408a4c 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/EnvelopePayloadConverter.java
@@ -32,7 +32,7 @@ import org.apache.gobblin.configuration.WorkUnitState;
*
* <p> Given an envelope schema as the input schema, the output schema will have the payload
* field, configured by key {@value PAYLOAD_FIELD}, set with its latest schema fetched from a
- * {@link #registry} (see {@code createDecoratedField(Field)}). The converter copies the other fields
+ * {@link #registry} (see {@code createLatestPayloadField(Field)}). The converter copies the other fields
* from the input schema to the output schema
*
* <p> Given an envelope record as the input record, the output record will have the payload set
@@ -51,13 +51,7 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
throws SchemaConversionException {
List<Field> outputSchemaFields = new ArrayList<>();
for (Field field : inputSchema.getFields()) {
- if (field.name().equals(payloadField)) {
- // Decorate the field with full schema
- outputSchemaFields.add(createDecoratedField(field));
- } else {
- // Make a copy of the field to the output schema
- outputSchemaFields.add(new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()));
- }
+ outputSchemaFields.add(convertFieldSchema(inputSchema, field, workUnit));
}
Schema outputSchema = Schema
@@ -67,12 +61,26 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
}
/**
+ * Convert to the output schema of a field
+ */
+ protected Field convertFieldSchema(Schema inputSchema, Field field, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ if (field.name().equals(payloadField)) {
+ // Create a payload field with latest schema
+ return createLatestPayloadField(field);
+ }
+ // Make a copy of the field to the output schema
+ return new Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order());
+ }
+
+ /**
* Create a payload field with its latest schema fetched from {@link #registry}
*
* @param field the original payload field from input envelope schema
* @return a new payload field with its latest schema
*/
- private Field createDecoratedField(Field field) throws SchemaConversionException {
+ private Field createLatestPayloadField(Field field)
+ throws SchemaConversionException {
try {
Schema payloadSchema = fetchLatestPayloadSchema();
return new Field(field.name(), payloadSchema, DECORATED_PAYLOAD_DOC, field.defaultValue(), field.order());
@@ -86,12 +94,20 @@ public class EnvelopePayloadConverter extends BaseEnvelopeSchemaConverter<Generi
throws DataConversionException {
GenericRecord outputRecord = new GenericData.Record(outputSchema);
for (Field field : inputRecord.getSchema().getFields()) {
- if (field.name().equals(payloadField)) {
- outputRecord.put(payloadField, upConvertPayload(inputRecord));
- } else {
- outputRecord.put(field.name(), inputRecord.get(field.name()));
- }
+ outputRecord.put(field.name(), convertFieldValue(outputSchema, field, inputRecord, workUnit));
}
return new SingleRecordIterable<>(outputRecord);
}
+
+ /**
+ * Convert to the output value of a field
+ */
+ protected Object convertFieldValue(Schema outputSchema, Field field, GenericRecord inputRecord,
+ WorkUnitState workUnit)
+ throws DataConversionException {
+ if (field.name().equals(payloadField)) {
+ return upConvertPayload(inputRecord);
+ }
+ return inputRecord.get(field.name());
+ }
}