You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/04/09 06:04:26 UTC
incubator-gobblin git commit: [GOBBLIN-459] Support encryption and
decryption of strings + arrays of strings
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 461999eda -> 3323543b6
[GOBBLIN-459] Support encryption and decryption of strings + arrays of strings
Closes #2330 from eogren/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/3323543b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/3323543b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/3323543b
Branch: refs/heads/master
Commit: 3323543b6cfb5e5cfbfbea85783a5976f3a76267
Parents: 461999e
Author: Eric Ogren <eo...@linkedin.com>
Authored: Sun Apr 8 23:04:42 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Sun Apr 8 23:04:42 2018 -0700
----------------------------------------------------------------------
.../gobblin/recordaccess/RecordAccessor.java | 7 +
.../recordaccess/AvroGenericRecordAccessor.java | 60 +++++++-
.../AvroGenericRecordAccessorTest.java | 49 ++++++-
.../RecordAccessorProviderFactoryTest.java | 16 +++
.../resources/converter/fieldPickInput.avsc | 3 +-
.../converter/fieldPickInput_arrays.avro | Bin 0 -> 552 bytes
.../AvroStringFieldDecryptorConverter.java | 34 +++++
.../StringFieldDecryptorConverter.java | 131 +++++++++++++++++
.../StringFieldEncryptorConverter.java | 45 ++++--
.../AvroStringFieldDecryptorConverterTest.java | 141 +++++++++++++++++++
.../AvroStringFieldEncryptorConverterTest.java | 62 +++++++-
.../test/resources/fieldPickInput_arrays.avro | Bin 0 -> 552 bytes
12 files changed, 525 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
index 5791771..10f2fb3 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/recordaccess/RecordAccessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.recordaccess;
+import java.util.List;
import java.util.Map;
import org.apache.gobblin.annotation.Alpha;
@@ -72,14 +73,19 @@ public interface RecordAccessor {
* IncorrectTypeException if the underlying types do not match. Getters should not
* try to do any type coercion -- for example, getAsInt for a value that is the string "1"
* should throw a Sch.
+ *
+ * The get*Generic functions should return the following object types:
+ * String, Integer, Long, or a List of them.
*/
Map<String, String> getMultiAsString(String fieldName);
Map<String, Integer> getMultiAsInt(String fieldName);
Map<String, Long> getMultiAsLong(String fieldName);
+ Map<String, Object> getMultiGeneric(String fieldName);
String getAsString(String fieldName);
Integer getAsInt(String fieldName);
Long getAsLong(String fieldName);
+ Object getGeneric(String fieldName);
/*
* Set new values for an object. Should throw a FieldDoesNotExistException runtime exception if fieldName
@@ -88,6 +94,7 @@ public interface RecordAccessor {
void set(String fieldName, String value);
void set(String fieldName, Integer value);
void set(String fieldName, Long value);
+ void setStringArray(String fieldName, List<String> value);
void setToNull(String fieldName);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
index d51b5db..620933d 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.recordaccess;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -23,6 +24,7 @@ import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
@@ -48,7 +50,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
@Override
public Map<String, String> getMultiAsString(String fieldName) {
- Map<String, Object> vals = getMultiAsObject(fieldName);
+ Map<String, Object> vals = getMultiGeneric(fieldName);
Map<String, String> ret = new HashMap<>();
for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -69,9 +71,12 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
return convertToString(fieldName, obj);
}
+
private String convertToString(String fieldName, Object obj) {
if (obj == null) {
return null;
+ } else if (obj instanceof String) {
+ return (String)obj;
} else if (obj instanceof Utf8) {
return obj.toString();
} else {
@@ -81,7 +86,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
@Override
public Map<String, Integer> getMultiAsInt(String fieldName) {
- Map<String, Object> vals = getMultiAsObject(fieldName);
+ Map<String, Object> vals = getMultiGeneric(fieldName);
Map<String, Integer> ret = new HashMap<>();
for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -107,7 +112,7 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
@Override
public Map<String, Long> getMultiAsLong(String fieldName) {
- Map<String, Object> vals = getMultiAsObject(fieldName);
+ Map<String, Object> vals = getMultiGeneric(fieldName);
Map<String, Long> ret = new HashMap<>();
for (Map.Entry<String, Object> entry : vals.entrySet()) {
@@ -151,8 +156,36 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
return obj.isPresent() ? obj.get() : null;
}
- private Map<String, Object> getMultiAsObject(String fieldName) {
- return AvroUtils.getMultiFieldValue(record, fieldName);
+ @Override
+ public Map<String, Object> getMultiGeneric(String fieldName) {
+ Map<String, Object> vals = AvroUtils.getMultiFieldValue(record, fieldName);
+ for (Map.Entry<String, Object> entry: vals.entrySet()) {
+ vals.put(entry.getKey(), convertAvroToJava(entry.getKey(), entry.getValue()));
+ }
+
+ return vals;
+ }
+
+ @Override
+ public Object getGeneric(String fieldName) {
+ Object val = getAsObject(fieldName);
+ return convertAvroToJava(fieldName, val);
+ }
+
+ private Object convertAvroToJava(String fieldName, Object val) {
+ if (val == null || val instanceof String || val instanceof Long || val instanceof Integer) {
+ return val;
+ }
+
+ if (val instanceof Utf8) {
+ return convertToString(fieldName, val);
+ }
+
+ if (val instanceof GenericArray) {
+ return convertToList(fieldName, (GenericArray) val);
+ }
+
+ throw new IllegalArgumentException("Don't know how to parse object of type " + val.getClass().getCanonicalName());
}
@Override
@@ -171,6 +204,13 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
}
@Override
+ public void setStringArray(String fieldName, List<String> value) {
+ GenericData.Array<String> avroArray = new GenericData.Array<>(
+ Schema.createArray(Schema.create(Schema.Type.STRING)), value);
+ set(fieldName, avroArray);
+ }
+
+ @Override
public void setToNull(String fieldName) {
set(fieldName, (Object) null);
}
@@ -225,4 +265,14 @@ public class AvroGenericRecordAccessor implements RecordAccessor {
throw new FieldDoesNotExistException("Field not found setting name " + fieldName, e);
}
}
+
+ @SuppressWarnings("unchecked")
+ private List convertToList(String fieldName, GenericArray arr) {
+ List ret = new ArrayList();
+ for (int i = 0; i < arr.size(); i++) {
+ ret.add(convertAvroToJava(fieldName + "." + String.valueOf(i), arr.get(i)));
+ }
+
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
index ed43aee..17e46b0 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/AvroGenericRecordAccessorTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.recordaccess;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
@@ -31,12 +32,15 @@ import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.ITestResult;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableList;
+
public class AvroGenericRecordAccessorTest {
private Schema recordSchema;
@@ -112,6 +116,39 @@ public class AvroGenericRecordAccessorTest {
Assert.assertEquals(accessor.getAsString("nestedRecords.1.fieldToEncrypt"), "val1");
}
+ @Test
+ public void testSetStringArray() throws IOException {
+ List<String> quotes = ImmutableList.of("abracadabra", "hocuspocus");
+ accessor.setStringArray("favorite_quotes", quotes);
+
+ Assert.assertEquals(accessor.getGeneric("favorite_quotes"), quotes);
+ }
+
+ @Test
+ public void testGetStringArrayUtf8() throws IOException {
+ // Expectation: Even though we read an Avro object with UTF8 underneath, the accessor converts it into a
+ // Java String
+ List<String> expectedQuotes = ImmutableList.of("abc", "defg");
+
+ GenericData.Array<Utf8> strings = new GenericData.Array<Utf8>(2, Schema.createArray(Schema.create(Schema.Type.STRING)));
+ expectedQuotes.forEach(s -> strings.add(new Utf8(s)));
+ record.put("favorite_quotes", strings);
+
+ Assert.assertEquals(accessor.getGeneric("favorite_quotes"), expectedQuotes);
+ }
+
+ @Test
+ public void testGetMultiConvertsStrings() throws IOException {
+ updateRecordFromTestResource("converter/fieldPickInput", "converter/fieldPickInput_arrays.avro");
+ Map<String, Object> ret = accessor.getMultiGeneric("favorite_quotes");
+ Object val = ret.get("favorite_quotes");
+
+ Assert.assertTrue(val instanceof List);
+ List castedVal = (List)val;
+ Assert.assertEquals(2, castedVal.size());
+ Assert.assertEquals("hello world", castedVal.get(0));
+ Assert.assertEquals("foobar", castedVal.get(1));
+ }
@Test
public void testSetValueFromArray() throws IOException {
@@ -184,15 +221,23 @@ public class AvroGenericRecordAccessorTest {
record.put("created", 0L);
}
- private void updateRecordFromTestResource(String resourceName)
+ private void updateRecordFromTestResource(String resourceName) throws IOException {
+ updateRecordFromTestResource(resourceName, null);
+ }
+
+ private void updateRecordFromTestResource(String resourceName, String avroFileName)
throws IOException {
+ if (avroFileName == null) {
+ avroFileName = resourceName + ".avro";
+ }
+
recordSchema = new Schema.Parser().parse(
getClass().getClassLoader().getResourceAsStream(resourceName + ".avsc")
);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(recordSchema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(
- new File(getClass().getClassLoader().getResource(resourceName + ".avro").getPath()), reader);
+ new File(getClass().getClassLoader().getResource(avroFileName).getPath()), reader);
Assert.assertTrue(dataFileReader.hasNext());
record = dataFileReader.next(record);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
index a6a8ce0..3ed37f4 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/recordaccess/RecordAccessorProviderFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.recordaccess;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
@@ -121,6 +122,21 @@ public class RecordAccessorProviderFactoryTest {
}
@Override
+ public Map<String, Object> getMultiGeneric(String fieldName) {
+ return null;
+ }
+
+ @Override
+ public Object getGeneric(String fieldName) {
+ return null;
+ }
+
+ @Override
+ public void setStringArray(String fieldName, List<String> value) {
+
+ }
+
+ @Override
public Map<String, String> getMultiAsString(String fieldName) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
index 29b40a1..4da3fdf 100644
--- a/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
+++ b/gobblin-core/src/test/resources/converter/fieldPickInput.avsc
@@ -7,6 +7,7 @@
{"name": "favorite_color", "type": ["string", "null"]},
{"name": "date_of_birth", "type": "long"},
{"name": "last_modified", "type": "long"},
- {"name": "created", "type": "long"}
+ {"name": "created", "type": "long"},
+ {"name": "favorite_quotes", "type": [{ "type": "array", "items": "string"}, "null"], "default": null}
]
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro
new file mode 100644
index 0000000..c10a607
Binary files /dev/null and b/gobblin-core/src/test/resources/converter/fieldPickInput_arrays.avro differ
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
new file mode 100644
index 0000000..59408c3
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.gobblin.recordaccess.AvroGenericRecordAccessor;
+import org.apache.gobblin.recordaccess.RecordAccessor;
+
+
+/**
+ * StringFieldDecryptor that works on Avro GenericRecords.
+ */
+public class AvroStringFieldDecryptorConverter extends StringFieldDecryptorConverter<Schema, GenericRecord> {
+ @Override
+ protected RecordAccessor getRecordAccessor(GenericRecord record) {
+ return new AvroGenericRecordAccessor(record);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
new file mode 100644
index 0000000..26f4bbd
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldDecryptorConverter.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.codec.StreamCodec;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.crypto.EncryptionFactory;
+import org.apache.gobblin.recordaccess.RecordAccessor;
+
+import com.google.common.base.Splitter;
+
+
+/**
+ * Converter that can decrypt a string field in place. (Note: that means the incoming
+ * record will be mutated!). Assumes that the input field is of string
+ * type and that the decryption algorithm chosen will output a UTF-8 encoded byte array.
+ */
+public abstract class StringFieldDecryptorConverter<SCHEMA, DATA> extends Converter<SCHEMA, SCHEMA, DATA, DATA> {
+ public static final String FIELDS_TO_DECRYPT_CONFIG_NAME = "converter.fieldsToDecrypt";
+
+ private StreamCodec decryptor;
+ private List<String> fieldsToDecrypt;
+
+ @Override
+ public Converter<SCHEMA, SCHEMA, DATA, DATA> init(WorkUnitState workUnit) {
+ super.init(workUnit);
+ Map<String, Object> config = EncryptionConfigParser
+ .getConfigForBranch(EncryptionConfigParser.EntityType.CONVERTER_DECRYPT, getClass().getSimpleName(), workUnit);
+ decryptor = EncryptionFactory.buildStreamCryptoProvider(config);
+
+ String fieldsToDecryptConfig = workUnit.getProp(FIELDS_TO_DECRYPT_CONFIG_NAME, null);
+ if (fieldsToDecryptConfig == null) {
+ throw new IllegalArgumentException("Must fill in the " + FIELDS_TO_DECRYPT_CONFIG_NAME + " config option!");
+ }
+
+ fieldsToDecrypt = Splitter.on(',').splitToList(fieldsToDecryptConfig);
+
+ return this;
+ }
+
+ @Override
+ public SCHEMA convertSchema(SCHEMA inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ return inputSchema;
+ }
+
+ @Override
+ public Iterable<DATA> convertRecord(SCHEMA outputSchema, DATA inputRecord, WorkUnitState workUnit)
+ throws DataConversionException {
+ RecordAccessor accessor = getRecordAccessor(inputRecord);
+
+ for (String field : fieldsToDecrypt) {
+ Map<String, Object> stringsToDecrypt = accessor.getMultiGeneric(field);
+ try {
+ for (Map.Entry<String, Object> entry : stringsToDecrypt.entrySet()) {
+ if (entry.getValue() instanceof String) {
+ String s = decryptString((String) entry.getValue());
+ accessor.set(entry.getKey(), s);
+ } else if (entry.getValue() instanceof List) {
+ List<String> decryptedValues = new ArrayList<>();
+ for (Object val : (List)entry.getValue()) {
+ if (!(val instanceof String)) {
+ throw new IllegalArgumentException("Expected List of Strings, but encountered a value of type "
+ + val.getClass().getCanonicalName());
+ }
+
+ decryptedValues.add(decryptString((String)val));
+ }
+
+ accessor.setStringArray(entry.getKey(), decryptedValues);
+ } else {
+ throw new IllegalArgumentException(
+ "Expected field to be of type String or List<String>, was " + entry.getValue().getClass()
+ .getCanonicalName());
+ }
+ }
+ } catch (IOException | IllegalArgumentException | IllegalStateException e) {
+ throw new DataConversionException("Error while encrypting field " + field + ": " + e.getMessage(), e);
+ }
+ }
+
+ return Collections.singleton(inputRecord);
+ }
+
+ protected List<String> getFieldsToDecrypt() {
+ return fieldsToDecrypt;
+ }
+
+ protected String decryptString(String val)
+ throws IOException {
+ byte[] encryptedBytes = val.getBytes(StandardCharsets.UTF_8);
+
+ ByteArrayInputStream inStream = new ByteArrayInputStream(encryptedBytes);
+
+ try (InputStream cipherStream = decryptor.decodeInputStream(inStream);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+ IOUtils.copy(cipherStream, outputStream);
+
+ byte[] decryptedBytes = outputStream.toByteArray();
+ return new String(decryptedBytes, StandardCharsets.UTF_8);
+ }
+ }
+
+ protected abstract RecordAccessor getRecordAccessor(DATA record);
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
index f242ec5..e9b21ed 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/converter/StringFieldEncryptorConverter.java
@@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -72,30 +73,48 @@ public abstract class StringFieldEncryptorConverter<SCHEMA, DATA> extends Conver
RecordAccessor accessor = getRecordAccessor(inputRecord);
for (String field : fieldsToEncrypt) {
- Map<String, String> stringsToEncrypt = accessor.getMultiAsString(field);
-
- for (Map.Entry<String, String> entry : stringsToEncrypt.entrySet()) {
- byte[] bytes = entry.getValue().getBytes(StandardCharsets.UTF_8);
-
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ Map<String, Object> stringsToEncrypt = accessor.getMultiGeneric(field);
+ for (Map.Entry<String, Object> entry : stringsToEncrypt.entrySet()) {
try {
- OutputStream cipherStream = encryptor.encodeOutputStream(outputStream);
- cipherStream.write(bytes);
- cipherStream.flush();
- cipherStream.close();
+ if (entry.getValue() instanceof String) {
+ accessor.set(entry.getKey(), encryptString((String) entry.getValue()));
+ } else if (entry.getValue() instanceof List) {
+ List<String> encryptedVals = new ArrayList<>();
+
+ for (Object val: (List)entry.getValue()) {
+ if (!(val instanceof String)) {
+ throw new IllegalArgumentException("Unexpected type " + val.getClass().getCanonicalName() +
+ " while encrypting field " + field);
+ }
+
+ encryptedVals.add(encryptString((String)val));
+ }
+
+ accessor.setStringArray(entry.getKey(), encryptedVals);
+ }
} catch (IOException | IllegalArgumentException | IllegalStateException e) {
throw new DataConversionException("Error while encrypting field " + field + ": " + e.getMessage(), e);
}
-
- byte[] cipherBytes = outputStream.toByteArray();
- accessor.set(entry.getKey(), new String(cipherBytes, StandardCharsets.UTF_8));
}
}
return Collections.singleton(inputRecord);
}
+ private String encryptString(String val)
+ throws IOException {
+ byte[] bytes = val.getBytes(StandardCharsets.UTF_8);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ OutputStream cipherStream = encryptor.encodeOutputStream(outputStream);
+ cipherStream.write(bytes);
+ cipherStream.flush();
+ cipherStream.close();
+ return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
+ }
+
protected List<String> getFieldsToEncrypt() {
return fieldsToEncrypt;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
new file mode 100644
index 0000000..36be27d
--- /dev/null
+++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldDecryptorConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.test.TestUtils;
+import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+
+
+public class AvroStringFieldDecryptorConverterTest {
+
+ @Test
+ public void testConversion()
+ throws DataConversionException, IOException, SchemaConversionException {
+ AvroStringFieldDecryptorConverter converter = new AvroStringFieldDecryptorConverter();
+ WorkUnitState wuState = new WorkUnitState();
+
+ wuState.getJobState().setProp("converter.fieldsToDecrypt", "field1");
+ wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm", "insecure_shift");
+
+ converter.init(wuState);
+ GenericRecord inputRecord = TestUtils.generateRandomAvroRecord();
+
+ Schema inputSchema = inputRecord.getSchema();
+ Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+ String fieldValue = (String) inputRecord.get("field1");
+
+ Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState);
+ GenericRecord decryptedRecord = recordIt.iterator().next();
+
+ Assert.assertEquals(outputSchema, inputSchema);
+ String decryptedValue = (String) decryptedRecord.get("field1");
+
+ InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap());
+ InputStream in = codec.decodeInputStream(new ByteArrayInputStream(fieldValue.getBytes(StandardCharsets.UTF_8)));
+ byte[] expectedDecryptedValue = new byte[in.available()];
+ in.read(expectedDecryptedValue);
+
+ Assert.assertEquals(new String(expectedDecryptedValue, StandardCharsets.UTF_8), decryptedValue);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testArrayDecryption()
+ throws DataConversionException, IOException, SchemaConversionException {
+ AvroStringFieldDecryptorConverter converter = new AvroStringFieldDecryptorConverter();
+ WorkUnitState wuState = new WorkUnitState();
+
+ wuState.getJobState().setProp("converter.fieldsToDecrypt", "array1");
+ wuState.getJobState().setProp("converter.decrypt.AvroStringFieldDecryptorConverter.algorithm", "insecure_shift");
+
+ converter.init(wuState);
+ GenericRecord inputRecord = generateRecordWithArrays();
+
+ Schema inputSchema = inputRecord.getSchema();
+ Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+ GenericData.Array<String> fieldValue = (GenericData.Array<String>) inputRecord.get("array1");
+
+ Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState);
+ GenericRecord decryptedRecord = recordIt.iterator().next();
+
+ Assert.assertEquals(outputSchema, inputSchema);
+ GenericData.Array<String> decryptedValue = (GenericData.Array<String>) decryptedRecord.get("array1");
+
+ for (int i = 0; i < decryptedValue.size(); i++) {
+ assertDecryptedValuesEqual(decryptedValue.get(i), fieldValue.get(i));
+ }
+ }
+
+ private void assertDecryptedValuesEqual(String decryptedValue, String originalValue) throws IOException {
+ InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap());
+ InputStream in = codec.decodeInputStream(new ByteArrayInputStream(originalValue.getBytes(StandardCharsets.UTF_8)));
+ byte[] expectedDecryptedValue = new byte[in.available()];
+ in.read(expectedDecryptedValue);
+
+ Assert.assertEquals(new String(expectedDecryptedValue, StandardCharsets.UTF_8), decryptedValue);
+ }
+
+ private GenericRecord getRecordFromFile(String path) throws IOException {
+ DatumReader<GenericRecord> reader = new GenericDatumReader<>();
+ DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(path), reader);
+ while (dataFileReader.hasNext()) {
+ return dataFileReader.next();
+ }
+
+ return null;
+ }
+
+ private GenericRecord generateRecordWithArrays() {
+ ArrayList<Schema.Field> fields = new ArrayList<Schema.Field>();
+ String fieldName = "array1";
+ Schema fieldSchema = Schema.createArray(Schema.create(Schema.Type.STRING));
+ String docString = "doc";
+ fields.add(new Schema.Field(fieldName, fieldSchema, docString, null));
+ Schema schema = Schema.createRecord("name", docString, "test", false);
+ schema.setFields(fields);
+
+ GenericData.Record record = new GenericData.Record(schema);
+ GenericData.Array<String> arr = new GenericData.Array<>(2, fieldSchema);
+ arr.add("foobar");
+ arr.add("foobaz");
+
+ record.put("array1", arr);
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
index e16401d..5078ae8 100644
--- a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
+++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/converter/AvroStringFieldEncryptorConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.converter;
import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -26,9 +27,14 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,6 +43,7 @@ import com.google.common.collect.Maps;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.test.TestUtils;
import org.apache.gobblin.test.crypto.InsecureShiftCodec;
+import org.testng.collections.Lists;
public class AvroStringFieldEncryptorConverterTest {
@@ -83,10 +90,61 @@ public class AvroStringFieldEncryptorConverterTest {
Assert.assertEquals(decryptedValues, origValues);
}
- private GenericRecord getRecordFromFile(String path) throws IOException {
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testEncryptionOfArray()
+ throws SchemaConversionException, DataConversionException, IOException {
+ AvroStringFieldEncryptorConverter converter = new AvroStringFieldEncryptorConverter();
+ WorkUnitState wuState = new WorkUnitState();
+
+ wuState.getJobState().setProp("converter.fieldsToEncrypt", "favorite_quotes");
+ wuState.getJobState().setProp("converter.encrypt.algorithm", "insecure_shift");
+
+ converter.init(wuState);
+ GenericRecord inputRecord =
+ getRecordFromFile(getClass().getClassLoader().getResource("fieldPickInput_arrays.avro").getPath());
+ GenericArray origValues = (GenericArray) inputRecord.get("favorite_quotes");
+ for (int i = 0; i < origValues.size(); i++) {
+ origValues.set(i, origValues.get(i).toString());
+ }
+
+ Schema inputSchema = inputRecord.getSchema();
+ Schema outputSchema = converter.convertSchema(inputSchema, wuState);
+
+ Iterable<GenericRecord> recordIt = converter.convertRecord(outputSchema, inputRecord, wuState);
+ GenericRecord encryptedRecord = recordIt.iterator().next();
+
+ Assert.assertEquals(outputSchema, inputSchema);
+
+ GenericArray<String> encryptedVals = (GenericArray<String>) encryptedRecord.get("favorite_quotes");
+ List<String> decryptedValues = Lists.newArrayList();
+ for (String encryptedValue: encryptedVals) {
+ InsecureShiftCodec codec = new InsecureShiftCodec(Maps.<String, Object>newHashMap());
+ InputStream in =
+ codec.decodeInputStream(new ByteArrayInputStream(encryptedValue.getBytes(StandardCharsets.UTF_8)));
+ byte[] decryptedValue = new byte[in.available()];
+ in.read(decryptedValue);
+ decryptedValues.add(new String(decryptedValue, StandardCharsets.UTF_8));
+ }
+
+ Assert.assertEquals(decryptedValues, origValues);
+ }
+
+ private GenericArray<String> buildTestArray() {
+ Schema s = Schema.createArray(Schema.create(Schema.Type.STRING));
+ GenericArray<String> arr = new GenericData.Array<>(3, s);
+ arr.add("one");
+ arr.add("two");
+ arr.add("three");
+
+ return arr;
+ }
+
+ private GenericRecord getRecordFromFile(String path)
+ throws IOException {
DatumReader<GenericRecord> reader = new GenericDatumReader<>();
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(new File(path), reader);
- while (dataFileReader.hasNext()) {
+ if (dataFileReader.hasNext()) {
return dataFileReader.next();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3323543b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro
new file mode 100644
index 0000000..c10a607
Binary files /dev/null and b/gobblin-modules/gobblin-crypto-provider/src/test/resources/fieldPickInput_arrays.avro differ