You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/03/19 18:40:42 UTC

incubator-gobblin git commit: [GOBBLIN-427] Add a decryption converter

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 0795fa7a0 -> b7f123f77


[GOBBLIN-427] Add a decryption converter

Closes #2304 from xzhang27/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b7f123f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b7f123f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b7f123f7

Branch: refs/heads/master
Commit: b7f123f77a58c690a9acf89f6d3168aeda259a17
Parents: 0795fa7
Author: Xiang <xn...@linkedin.com>
Authored: Mon Mar 19 11:39:46 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Mar 19 11:39:55 2018 -0700

----------------------------------------------------------------------
 .../gobblin/crypto/EncryptionConfigParser.java  |  4 +-
 .../crypto/EncryptionConfigParserTest.java      |  4 +-
 .../copy/converter/DecryptConverter.java        |  2 +-
 ...alizedRecordToSerializedRecordConverter.java | 44 +++++++++
 ...ordToEncryptedSerializedRecordConverter.java |  2 +-
 .../StringFieldEncryptorConverter.java          |  2 +-
 ...edRecordToSerializedRecordConverterTest.java | 95 ++++++++++++++++++++
 ...edRecordToSerializedRecordConverterBase.java | 79 ++++++++++++++++
 ...dRecordWithMetadataToRecordWithMetadata.java | 81 +++++++++++++++++
 ...ordWithMetadataToRecordWithMetadataTest.java | 75 ++++++++++++++++
 10 files changed, 382 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
index 324365a..900b616 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
@@ -52,6 +52,7 @@ public class EncryptionConfigParser {
    */
   static final String WRITER_ENCRYPT_PREFIX = ConfigurationKeys.WRITER_PREFIX + ".encrypt";
   static final String CONVERTER_ENCRYPT_PREFIX = "converter.encrypt";
+  static final String CONVERTER_DECRYPT_PREFIX = "converter.decrypt";
 
   public static final String ENCRYPTION_ALGORITHM_KEY = "algorithm";
   public static final String ENCRYPTION_KEYSTORE_PATH_KEY = "keystore_path";
@@ -72,7 +73,8 @@ public class EncryptionConfigParser {
    * enum maps entity type to a configuration prefix.
    */
   public enum EntityType {
-    CONVERTER(CONVERTER_ENCRYPT_PREFIX),
+    CONVERTER_ENCRYPT(CONVERTER_ENCRYPT_PREFIX),
+    CONVERTER_DECRYPT(CONVERTER_DECRYPT_PREFIX),
     WRITER(WRITER_ENCRYPT_PREFIX);
 
     private final String configPrefix;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
index 68fbf37..54b52d4 100644
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/crypto/EncryptionConfigParserTest.java
@@ -97,7 +97,7 @@ public class EncryptionConfigParserTest {
         "keyname");
     wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar");
 
-    Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, wuState);
+    Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, wuState);
     Assert.assertNotNull(parsedProperties, "Expected parser to only return one record");
     Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config");
 
@@ -125,7 +125,7 @@ public class EncryptionConfigParserTest {
         "keyname");
     wuState.setProp(EncryptionConfigParser.CONVERTER_ENCRYPT_PREFIX + "abc.def", "foobar");
 
-    Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, entityName, wuState);
+    Map<String, Object> parsedProperties = EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, entityName, wuState);
     Assert.assertNotNull(parsedProperties, "Expected parser to only return one record");
     Assert.assertEquals(parsedProperties.size(), 4, "Did not expect abc.def to be picked up in config");
     Assert.assertEquals(EncryptionConfigParser.getEncryptionType(parsedProperties), "aes_rotating");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
index d074046..d9c6353 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/converter/DecryptConverter.java
@@ -59,7 +59,7 @@ public class DecryptConverter extends DistcpConverter {
   @Override
   public Converter<String, String, FileAwareInputStream, FileAwareInputStream> init(WorkUnitState workUnit) {
     Map<String, Object> config =
-        EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, workUnit);
+        EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, workUnit);
 
    if (config == null) {
      // Backwards compatibility check: if no config was passed in via the standard config, revert back to GPG

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
new file mode 100644
index 0000000..2088eee
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Map;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.crypto.EncryptionFactory;
+
+
+/**
+ * Specific implementation of {@link EncryptedSerializedRecordToSerializedRecordConverterBase} that uses Gobblin's
+ * {@link EncryptionFactory} to build the proper decryption codec based on config.
+ */
+public class EncryptedSerializedRecordToSerializedRecordConverter extends EncryptedSerializedRecordToSerializedRecordConverterBase {
+  @Override
+  protected StreamCodec buildDecryptor(WorkUnitState config) {
+    Map<String, Object> decryptionConfig =
+        EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT,
+            getClass().getSimpleName(), config);
+    if (decryptionConfig == null) {
+      throw new IllegalStateException("No decryption config specified in job - can't decrypt!");
+    }
+
+    return EncryptionFactory.buildStreamCryptoProvider(decryptionConfig);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
index 2c830a9..be550c7 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/SerializedRecordToEncryptedSerializedRecordConverter.java
@@ -32,7 +32,7 @@ public class SerializedRecordToEncryptedSerializedRecordConverter extends Serial
   @Override
   protected StreamCodec buildEncryptor(WorkUnitState config) {
     Map<String, Object> encryptionConfig =
-        EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), config);
+        EncryptionConfigParser.getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), config);
     if (encryptionConfig == null) {
       throw new IllegalStateException("No encryption config specified in job - can't encrypt!");
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
index d6e8de1..f242ec5 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
@@ -47,7 +47,7 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, DATA> extends Conver
   public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) {
     super.init(workUnit);
     Map<String, Object> config = EncryptionConfigParser
-        .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER, getClass().getSimpleName(), workUnit);
+        .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_ENCRYPT, getClass().getSimpleName(), workUnit);
     encryptor = EncryptionFactory.buildStreamCryptoProvider(config);
 
     String fieldsToEncryptConfig = workUnit.getProp(FIELDS_TO_ENCRYPT_CONFIG_NAME, null);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
new file mode 100644
index 0000000..efaeb16
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Iterator;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.testng.Assert;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+
+public class EncryptedSerializedRecordToSerializedRecordConverterTest {
+
+  private WorkUnitState workUnitState;
+  private EncryptedSerializedRecordToSerializedRecordConverter converter;
+  private RecordWithMetadata<byte[]> sampleRecord;
+  private byte[] shiftedValue;
+  private String insecureShiftTag;
+
+  private final String DECRYPT_PREFIX = "converter.decrypt.";
+
+  @BeforeTest
+  public void setUp() {
+    workUnitState = new WorkUnitState();
+    converter = new EncryptedSerializedRecordToSerializedRecordConverter();
+    sampleRecord = new RecordWithMetadata<>(new byte[]{'b', 'c', 'd', 'e'}, new Metadata());
+    shiftedValue = new byte[]{'a', 'b', 'c', 'd'};
+    insecureShiftTag = InsecureShiftCodec.TAG;
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void throwsIfMisconfigured()
+      throws DataConversionException {
+    converter.init(workUnitState);
+    converter.convertRecord("", sampleRecord, workUnitState);
+  }
+
+  @Test
+  public void worksWithFork()
+      throws DataConversionException {
+    workUnitState.setProp(ConfigurationKeys.FORK_BRANCH_ID_KEY, 2);
+    workUnitState.getJobState()
+        .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY + ".2",
+            "insecure_shift");
+
+    converter.init(workUnitState);
+    Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState);
+    Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator();
+    Assert.assertTrue(recordIterator.hasNext());
+
+    RecordWithMetadata<byte[]> record = recordIterator.next();
+
+    Assert.assertFalse(recordIterator.hasNext());
+    Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag);
+    Assert.assertEquals(record.getRecord(), shiftedValue);
+  }
+
+  @Test
+  public void worksNoFork()
+      throws DataConversionException {
+    workUnitState.getJobState()
+        .setProp(DECRYPT_PREFIX + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY,
+            "insecure_shift");
+    converter.init(workUnitState);
+    Iterable<RecordWithMetadata<byte[]>> records = converter.convertRecord("", sampleRecord, workUnitState);
+    Iterator<RecordWithMetadata<byte[]>> recordIterator = records.iterator();
+    Assert.assertTrue(recordIterator.hasNext());
+
+    RecordWithMetadata<byte[]> record = recordIterator.next();
+
+    Assert.assertFalse(recordIterator.hasNext());
+    Assert.assertEquals(record.getMetadata().getGlobalMetadata().getTransferEncoding().get(0), insecureShiftTag);
+    Assert.assertEquals(record.getRecord(), shiftedValue);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
new file mode 100644
index 0000000..7b6baca
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/converter/EncryptedSerializedRecordToSerializedRecordConverterBase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.type.RecordWithMetadata;
+
+
+/**
+ * A converter that converts a encrypted {@link org.apache.gobblin.type.SerializedRecordWithMetadata} to
+ * a {@link org.apache.gobblin.type.SerializedRecordWithMetadata}. The decryption algorithm used will be
+ * appended to the Transfer-Encoding of the new record.
+ */
+@Slf4j
+public abstract class EncryptedSerializedRecordToSerializedRecordConverterBase extends Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> {
+  private StreamCodec decryptor;
+
+  @Override
+  public Converter<String, String, RecordWithMetadata<byte[]>, RecordWithMetadata<byte[]>> init(
+      WorkUnitState workUnit) {
+    super.init(workUnit);
+    decryptor = buildDecryptor(workUnit);
+    return this;
+  }
+
+  /**
+   * Build the StreamCodec that will be used to decrypt each byte record. Must be provided by concrete
+   * implementations of this class.
+   */
+  protected abstract StreamCodec buildDecryptor(WorkUnitState config);
+
+  @Override
+  public String convertSchema(String inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    return "";
+  }
+
+  @Override
+  public Iterable<RecordWithMetadata<byte[]>> convertRecord(String outputSchema, RecordWithMetadata<byte[]> inputRecord,
+      WorkUnitState workUnit)
+      throws DataConversionException {
+    try {
+      ByteArrayInputStream inputStream = new ByteArrayInputStream(inputRecord.getRecord());
+      byte[] decryptedBytes;
+      try (InputStream decryptedInputStream = decryptor.decodeInputStream(inputStream)) {
+        decryptedBytes = IOUtils.toByteArray(decryptedInputStream);
+      }
+      inputRecord.getMetadata().getGlobalMetadata().addTransferEncoding(decryptor.getTag());
+
+      RecordWithMetadata<byte[]> serializedRecord =
+          new RecordWithMetadata<byte[]>(decryptedBytes, inputRecord.getMetadata());
+      return Collections.singleton(serializedRecord);
+    } catch (Exception e) {
+      throw new DataConversionException(e);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
new file mode 100644
index 0000000..c686091
--- /dev/null
+++ b/gobblin-modules/gobblin-metadata/src/main/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A converter that takes a {@link RecordWithMetadata} and deserializes it by trying to parse it into a
+ * json format. It looks up two fields: "rMd" for record metadata and "r" for record details represented
+ * as a string.
+ */
+public class EnvelopedRecordWithMetadataToRecordWithMetadata extends Converter<String, Object, RecordWithMetadata<byte[]>, RecordWithMetadata<?>>  {
+
+  private static final String RECORD_KEY = "r";
+  private static final String METADATA_KEY = "rMd";
+  private static final String METADATA_RECORD_KEY = "recordMetadata";
+
+  private static final ObjectMapper objectMapper = new ObjectMapper().configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+  private static final JsonFactory jsonFactory = new JsonFactory();
+
+  @Override
+  public String convertSchema(String inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    return "";
+  }
+
+  @Override
+  public Iterable<RecordWithMetadata<?>> convertRecord(Object outputSchema, RecordWithMetadata<byte[]> inputRecord,
+      WorkUnitState workUnit)
+      throws DataConversionException {
+
+    try {
+      try (JsonParser parser = jsonFactory.createJsonParser(inputRecord.getRecord())) {
+        parser.setCodec(objectMapper);
+        JsonNode jsonNode = parser.readValueAsTree();
+
+        // extracts required record
+        if (!jsonNode.has(RECORD_KEY)) {
+          throw new DataConversionException("Input data does not have record.");
+        }
+        String record = jsonNode.get(RECORD_KEY).getTextValue();
+
+        // Extract metadata field
+        Metadata md = new Metadata();
+        if (jsonNode.has(METADATA_KEY) && jsonNode.get(METADATA_KEY).has(METADATA_RECORD_KEY)) {
+          md.getRecordMetadata().putAll(objectMapper.readValue(jsonNode.get(METADATA_KEY).get(METADATA_RECORD_KEY), Map.class));
+        }
+
+        return Collections.singleton(new RecordWithMetadata<>(record, md));
+      }
+    } catch (IOException e) {
+      throw new DataConversionException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b7f123f7/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
new file mode 100644
index 0000000..8d247a7
--- /dev/null
+++ b/gobblin-modules/gobblin-metadata/src/test/java/org/apache/gobblin/converter/EnvelopedRecordWithMetadataToRecordWithMetadataTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.gobblin.metadata.types.Metadata;
+import org.apache.gobblin.type.RecordWithMetadata;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test
+public class EnvelopedRecordWithMetadataToRecordWithMetadataTest {
+
+  @Test
+  public void testSuccessWithRecord() throws DataConversionException, IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+    String innerRecord = "abracadabra";
+
+    // Build the input record
+    HashMap<String, Object> map = new HashMap<>();
+    map.put("r", innerRecord);
+    Metadata md = new Metadata();
+    md.getRecordMetadata().put("test1", "test2");
+    map.put("rMd", md);
+    JsonNode jsonNode = objectMapper.valueToTree(map);
+    RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null);
+
+    EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata();
+    Iterator<RecordWithMetadata<?>> iterator =
+        converter.convertRecord(null, inputRecord, null).iterator();
+
+    Assert.assertTrue(iterator.hasNext());
+
+    RecordWithMetadata<?> outputRecord = iterator.next();
+
+    Assert.assertEquals(outputRecord.getRecord(), innerRecord);
+    Assert.assertEquals(outputRecord.getMetadata().getRecordMetadata().get("test1"), "test2");
+  }
+
+  @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Input data does not have record.")
+  public void testFailureWithoutRecord() throws DataConversionException, IOException {
+    ObjectMapper objectMapper = new ObjectMapper();
+
+    // Build the input record without data
+    HashMap<String, Object> map = new HashMap<>();
+    Metadata md = new Metadata();
+    md.getRecordMetadata().put("test1", "test2");
+    map.put("rMd", md);
+    JsonNode jsonNode = objectMapper.valueToTree(map);
+    RecordWithMetadata<byte[]> inputRecord = new RecordWithMetadata<>(jsonNode.toString().getBytes(), null);
+
+    EnvelopedRecordWithMetadataToRecordWithMetadata converter = new EnvelopedRecordWithMetadataToRecordWithMetadata();
+    converter.convertRecord(null, inputRecord, null);
+  }
+
+}