You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/03/02 17:33:46 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1066] field
projection with namespace
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 04fc0dd [GOBBLIN-1066] field projection with namespace
04fc0dd is described below
commit 04fc0ddca2a00933cf0248d01d106487b529462f
Author: zhchen <zh...@linkedin.com>
AuthorDate: Mon Mar 2 09:33:39 2020 -0800
[GOBBLIN-1066] field projection with namespace
Closes #2904 from zxcware/hsec
---
.../converter/filter/AvroProjectionConverter.java | 5 ++
.../filter/AvroProjectionConverterTest.java | 62 ++++++++++++++++++++++
.../avroProjectionConverter/simpleRecord.avsc | 17 ++++++
.../avroProjectionConverter/simpleRecord.json | 4 ++
4 files changed, 88 insertions(+)
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
index 1533f94..2d2202f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
@@ -42,6 +42,7 @@ import org.apache.gobblin.util.AvroUtils;
public class AvroProjectionConverter extends AvroToAvroConverterBase {
public static final String REMOVE_FIELDS = ".remove.fields";
+ public static final String USE_NAMESPACE = "avroProjectionConverter.useNamespace";
private Optional<AvroSchemaFieldRemover> fieldRemover;
@@ -55,6 +56,10 @@ public class AvroProjectionConverter extends AvroToAvroConverterBase {
public AvroProjectionConverter init(WorkUnitState workUnit) {
if (workUnit.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY)) {
String removeFieldsPropName = workUnit.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY) + REMOVE_FIELDS;
+ if (workUnit.getPropAsBoolean(USE_NAMESPACE)) {
+ removeFieldsPropName = String.format("%s.%s",
+ workUnit.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY), removeFieldsPropName);
+ }
if (workUnit.contains(removeFieldsPropName)) {
this.fieldRemover = Optional.of(new AvroSchemaFieldRemover(workUnit.getProp(removeFieldsPropName)));
} else {
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroProjectionConverterTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroProjectionConverterTest.java
new file mode 100644
index 0000000..f176a67
--- /dev/null
+++ b/gobblin-core-base/src/test/java/org/apache/gobblin/converter/filter/AvroProjectionConverterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.filter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.util.test.TestIOUtils;
+
+
+public class AvroProjectionConverterTest {
+
+ @Test
+ public void testRemoveWithNamespace()
+ throws Exception {
+
+ WorkUnitState wus = new WorkUnitState();
+ wus.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "db1");
+ wus.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table1");
+ wus.setProp(AvroProjectionConverter.USE_NAMESPACE, true);
+
+ GenericRecord inputRecord = TestIOUtils.readAllRecords(
+ getClass().getResource("/avroProjectionConverter/simpleRecord.json").getPath(),
+ getClass().getResource("/avroProjectionConverter/simpleRecord.avsc").getPath()).get(0);
+ Schema inputSchema = inputRecord.getSchema();
+
+ AvroProjectionConverter converter = new AvroProjectionConverter();
+ // Test no field removed with table1.remove.fields
+ wus.setProp("table1.remove.fields", "id");
+ converter.init(wus);
+ Schema outputSchema = converter.convertSchema(inputSchema, wus);
+ Assert.assertEquals(outputSchema.getFields().size(), 2);
+
+ // Field successfully removed
+ wus.setProp("db1.table1.remove.fields", "id");
+ converter.init(wus);
+ outputSchema = converter.convertSchema(inputSchema, wus);
+ Assert.assertEquals(outputSchema.getFields().size(), 1);
+
+ GenericRecord outputRecord = converter.convertRecord(outputSchema, inputRecord, wus).iterator().next();
+ Assert.assertEquals(outputRecord.toString(), "{\"created\": 20170906185911}");
+ }
+}
diff --git a/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.avsc b/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.avsc
new file mode 100644
index 0000000..0dcbc08
--- /dev/null
+++ b/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.avsc
@@ -0,0 +1,17 @@
+{
+ "type": "record",
+ "name": "SimpleRecord",
+ "namespace": "org.apache.gobblin.test",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string",
+ "doc": "ID of the record."
+ },
+ {
+ "name": "created",
+ "type": "long",
+ "doc": "a time stamp."
+ }
+ ]
+}
\ No newline at end of file
diff --git a/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.json b/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.json
new file mode 100644
index 0000000..a0a90c1
--- /dev/null
+++ b/gobblin-core-base/src/test/resources/avroProjectionConverter/simpleRecord.json
@@ -0,0 +1,4 @@
+{
+ "id" : "simpleRecord-20170906185911",
+ "created" : 20170906185911
+}
\ No newline at end of file