You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/19 18:40:42 UTC
incubator-gobblin git commit: [GOBBLIN-427] Add a decryption converter
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 0795fa7a0 -> b7f123f77
[GOBBLIN-427] Add a decryption converter
Closes #2304 from xzhang27/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b7f123f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b7f123f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b7f123f7
Branch: refs/heads/master
Commit: b7f123f77a58c690a9acf89f6d3168aeda259a17
Parents: 0795fa7
Author: Xiang <xn...@linkedin.com>
Authored: Mon Mar 19 11:39:46 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Mar 19 11:39:55 2018 -0700
----------------------------------------------------------------------
.../gobblin/crypto/EncryptionConfigParser.java | 4 +-
.../crypto/EncryptionConfigParserTest.java | 4 +-
.../copy/converter/DecryptConverter.java | 2 +-
...alizedRecordToSerializedRecordConverter.java | 44 +++++++++
...ordToEncryptedSerializedRecordConverter.java | 2 +-
.../StringFieldEncryptorConverter.java | 2 +-
...edRecordToSerializedRecordConverterTest.java | 95 ++++++++++++++++++++
...edRecordToSerializedRecordConverterBase.java | 79 ++++++++++++++++
...dRecordWithMetadataToRecordWithMetadata.java | 81 +++++++++++++++++
...ordWithMetadataToRecordWithMetadataTest.java | 75 ++++++++++++++++
10 files changed, 382 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
index 324365a..900b616 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
@@ -52,6 +52,7 @@ public class EncryptionConfigParser {
*/
static final String WRITER_ENCRYPT_PREFIX = ConfigurationKeys.WRITER_PREFIX + ".encrypt";
static final String CONVERTER_ENCRYPT_PREFIX = "converter.encrypt";
+ static final String CONVERTER_DECRYPT_PREFIX = "converter.decrypt";
public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
public static final String ENCRYPTION_KEYSTORE_PATH_KEY = "keystore_path";
@@ -72,7 +73,8 @@ public class EncryptionConfigParser {
* enum maps entity type to a configuration prefix.
*/
public enum EntityType {
- CONVERTER(CONVERTER_ENCRYPT_PREFIX),
+ CONVERTER_ENCRYPT(CONVERTER_ENCRYPT_PREFIX),
+ CONVERTER_DECRYPT(CONVERTER_DECRYPT_PREFIX),
WRITER(WRITER_ENCRYPT_PREFIX);
private final String configPrefix;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
index 68fbf37..54b52d4 100644
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
@@ -97,7 +97,7 @@ public class EncryptionConfigParserTest {
"keyname");
wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar");
- Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, wuState);
+ Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, wuState);
Assert.assertNotNull(parsedProperties, "Expected parser to only return one record");
Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config");
@@ -125,7 +125,7 @@ public class EncryptionConfigParserTest {
"keyname");
wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar");
- Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, entityName, wuState);
+ Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, entityName, wuState);
Assert.assertNotNull(parsedProperties, "Expected parser to only return one record");
Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config");
Assert.assertEquals(EncryptionConfigParser.getEncryptionType(parsedProperties), "aes_rotating");
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
index d074046..d9c6353 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
@@ -59,7 +59,7 @@ public class DecryptConverter extends DistcpConverter {
@Override
public Converter<String, String, FileAwareInputStream, FileAwareInputStream> init(WorkUnitState workUnit) {
Map<String, Object> config =
- EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, workUnit);
+ EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, workUnit);
if (config == null) {
// Backwards compatibility check: if no config was passed in via the standard config, revert back to GPG
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
new file mode 100644
index 0000000..2088eee
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Map;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.crypto.EncryptionFactory;
+
+
+/**
+ * Specific implementation of {@link EncryptedSerializedRecordToSerializedRecordConverterBase} that uses Gobblin's
+ * {@link EncryptionFactory} to build the proper decryption codec based on config.
+ */
+public class EncryptedSerializedRecordToSerializedRecordConverter extends EncryptedSerializedRecordToSerializedRecordConverterBase {
+ @Override
+ protected StreamCodec buildDecryptor(WorkUnitState config) {
+ Map<String, Object> decryptionConfig =
+ EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT,
+ getClass().getSimpleName(), config);
+ if (decryptionConfig == null) {
+ throw new IllegalStateException("No decryption config specified in job - can't decrypt!");
+ }
+
+ return EncryptionFactory.buildStreamCryptoProvider(decryptionConfig);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
index 2c830a9..be550c7 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
@@ -32,7 +32,7 @@ public class SerializedRecordToEncryptedSerializedRecordConverter extends Serial
@Override
protected StreamCodec buildEncryptor(WorkUnitState config) {
Map<String, Object> encryptionConfig =
- EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), config);
+ EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), config);
if (encryptionConfig == null) {
throw new IllegalStateException("No encryption config specified in job - can't encrypt!");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
index d6e8de1..f242ec5 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
@@ -47,7 +47,7 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, DATA> extends Conver
public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) {
super.init(workUnit);
Map<String, Object> config = EncryptionConfigParser
- .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), workUnit);
+ .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), workUnit);
encryptor = EncryptionFactory.buildStreamCryptoProvider(config);
String fieldsToEncryptConfig = workUnit.getProp(FIELDS_TO_ENCRYPT_CONFIG_NAME, null);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
new file mode 100644
index 0000000..efaeb16
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Iterator;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+public class EncryptedSerializedRecordToSerializedRecordConverterTest {
+
+ private WorkUnitState workUnitState;
+ private EncryptedSerializedRecordToSerializedRecordConverter converter;
+ private RecordWithMetadata<byte[]> sampleRecord;
+ private byte[] shiftedValue;
+ private String insecureShiftTag;
+
+ private final String DECRYPT_PREFIX = "converter.decrypt.";
+
+ @BeforeTest
+ public void setUp() {
+ workUnitState = new WorkUnitState();
+ converter = new EncryptedSerializedRecordToSerializedRecordConverter();
+ sampleRecord = new RecordWithMetadata<>(new byte[]{'b', 'c', 'd', 'e'}, new Metadata());
+ shiftedValue = new byte[]{'a', 'b', 'c', 'd'};
+ insecureShiftTag = InsecureShiftCodec.TAG;
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void throwsIfMisconfigured()
+ throws DataConversionException {
+ converter.init(workUnitState);
+ converter.convertRecord("", sampleRecord, workUnitState);
+ }
+
+ @Test
+ public void worksWithFork()
+ throws DataConversionException {
+ workUnitState.setProp(ConfigurationKeys.FORK_BRANCH_ID_KEY, 2);
+ workUnitState.getJobState()
+ .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY + ".2",
+ "insecure_shift");
+
+ converter.init(workUnitState);
+ Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState);
+ Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator();
+ Assert.assertTrue(recordIterator.hasNext());
+
+ RecordWithMetadata<byte[]> record = recordIterator.next();
+
+ Assert.assertFalse(recordIterator.hasNext());
+ Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag);
+ Assert.assertEquals(record.getRecord(), shiftedValue);
+ }
+
+ @Test
+ public void worksNoFork()
+ throws DataConversionException {
+ workUnitState.getJobState()
+ .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY,
+ "insecure_shift");
+ converter.init(workUnitState);
+ Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState);
+ Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator();
+ Assert.assertTrue(recordIterator.hasNext());
+
+ RecordWithMetadata<byte[]> record = recordIterator.next();
+
+ Assert.assertFalse(recordIterator.hasNext());
+ Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag);
+ Assert.assertEquals(record.getRecord(), shiftedValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
new file mode 100644
index 0000000..7b6baca
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.type.RecordWithMetadata;
+
+
+/**
+ * A converter that converts a encrypted {@link org.apache.gobblin.type.SerializedRecordWithMetadata} to
+ * a {@link org.apache.gobblin.type.SerializedRecordWithMetadata}. The decryption algorithm used will be
+ * appended to the Transfer-Encoding of the new record.
+ */
+@Slf4j
+public abstract class EncryptedSerializedRecordToSerializedRecordConverterBase extends Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> {
+ private StreamCodec decryptor;
+
+ @Override
+ public Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> init(
+ WorkUnitState workUnit) {
+ super.init(workUnit);
+ decryptor = buildDecryptor(workUnit);
+ return this;
+ }
+
+ /**
+ * Build the StreamCodec that will be used to decrypt each byte record. Must be provided by concrete
+ * implementations of this class.
+ */
+ protected abstract StreamCodec buildDecryptor(WorkUnitState config);
+
+ @Override
+ public String convertSchema(String inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ return "";
+ }
+
+ @Override
+ public Iterable<RecordWithMetadata<byte[]>> convertRecord(String outputSchema, RecordWithMetadata<byte[]> inputRecord,
+ WorkUnitState workUnit)
+ throws DataConversionException {
+ try {
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(inputRecord.getRecord());
+ byte[] decryptedBytes;
+ try (InputStream decryptedInputStream = decryptor.decodeInputStream(inputStream)) {
+ decryptedBytes = IOUtils.toByteArray(decryptedInputStream);
+ }
+ inputRecord.getMetadata().getGlobalMetadata().addTransferEncoding(decryptor.getTag());
+
+ RecordWithMetadata<byte[]> serializedRecord =
+ new RecordWithMetadata<byte[]>(decryptedBytes, inputRecord.getMetadata());
+ return Collections.singleton(serializedRecord);
+ } catch (Exception e) {
+ throw new DataConversionException(e);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
new file mode 100644
index 0000000..c686091
--- /dev/null
+++ b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A converter that takes a {@link RecordWithMetadata} and deserializes it by trying to parse it into a
+ * json format. It looks up two fields: "rMd" for record metadata and "r" for record details represented
+ * as a string.
+ */
+public class EnvelopedRecordWithMetadataToRecordWithMetadata extends Converter<String, Object, RecordWithMetadata<byte[]>, RecordWithMetadata<?>> {
+
+ private static final String RECORD_KEY = "r";
+ private static final String METADATA_KEY = "rMd";
+ private static final String METADATA_RECORD_KEY = "recordMetadata";
+
+ private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ private static final JsonFactory jsonFactory = new JsonFactory();
+
+ @Override
+ public String convertSchema(String inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ return "";
+ }
+
+ @Override
+ public Iterable<RecordWithMetadata<?>> convertRecord(Object outputSchema, RecordWithMetadata<byte[]> inputRecord,
+ WorkUnitState workUnit)
+ throws DataConversionException {
+
+ try {
+ try (JsonParser parser = jsonFactory.createJsonParser(inputRecord.getRecord())) {
+ parser.setCodec(objectMapper);
+ JsonNode jsonNode = parser.readValueAsTree();
+
+ // extracts required record
+ if (!jsonNode.has(RECORD_KEY)) {
+ throw new DataConversionException("Input data does not have record.");
+ }
+ String record = jsonNode.get(RECORD_KEY).getTextValue();
+
+ // Extract metadata field
+ Metadata md = new Metadata();
+ if (jsonNode.has(METADATA_KEY) && jsonNode.get(METADATA_KEY).has(METADATA_RECORD_KEY)) {
+ md.getRecordMetadata().putAll(objectMapper.readValue(jsonNode.get(METADATA_KEY).get(METADATA_RECORD_KEY), Map.class));
+ }
+
+ return Collections.singleton(new RecordWithMetadata<>(record, md));
+ }
+ } catch (IOException e) {
+ throw new DataConversionException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
new file mode 100644
index 0000000..8d247a7
--- /dev/null
+++ b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test
+public class EnvelopedRecordWithMetadataToRecordWithMetadataTest {
+
+ @Test
+ public void testSuccessWithRecord() throws DataConversionException, IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ String innerRecord = "abracadabra";
+
+ // Build the input record
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("r", innerRecord);
+ Metadata md = new Metadata();
+ md.getRecordMetadata().put("test1", "test2");
+ map.put("rMd", md);
+ JsonNode jsonNode = objectMapper.valueToTree(map);
+ RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null);
+
+ EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata();
+ Iterator<RecordWithMetadata<?>> iterator =
+ converter.convertRecord(null, inputRecord, null).iterator();
+
+ Assert.assertTrue(iterator.hasNext());
+
+ RecordWithMetadata<?> outputRecord = iterator.next();
+
+ Assert.assertEquals(outputRecord.getRecord(), innerRecord);
+ Assert.assertEquals(outputRecord.getMetadata().getRecordMetadata().get("test1"), "test2");
+ }
+
+ @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Input data does not have record.")
+ public void testFailureWithoutRecord() throws DataConversionException, IOException {
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ // Build the input record without data
+ HashMap<String, Object> map = new HashMap<>();
+ Metadata md = new Metadata();
+ md.getRecordMetadata().put("test1", "test2");
+ map.put("rMd", md);
+ JsonNode jsonNode = objectMapper.valueToTree(map);
+ RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null);
+
+ EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata();
+ converter.convertRecord(null, inputRecord, null);
+ }
+
+}