You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/05 02:08:41 UTC
[beam] branch master updated: [BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e7d3e8c9814 [BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)
e7d3e8c9814 is described below
commit e7d3e8c9814c8a9fec647a48818edaeca94dcb42
Author: Vitaly Terentyev <vi...@akvelon.com>
AuthorDate: Thu May 5 05:08:28 2022 +0300
[BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)
* [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
* [BEAM-14048] Fix checkstyle
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warning
* [BEAM-14048] Fix warning
* [BEAM-14048] Remove unused dependencies
* [BEAM-14048] Add needed dependencies
* [BEAM-14048] Fix spotless
* [BEAM-14048] Fix typo
* [BEAM-14048] Use fori instead of stream
* [BEAM-14048] Suppress warning
* [BEAM-14048] Add used undeclared artifacts
* [BEAM-14048] Change dependencies to test
* [BEAM-14048] Refactoring
* [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
* [BEAM-14048] Suppress warning
* Update maven repo
* Update build.gradle
* [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
* [BEAM-14048] Set macroFields
Co-authored-by: Alex Kosolapov <al...@gmail.com>
Co-authored-by: Elizaveta Lomteva <el...@akvelon.com>
Co-authored-by: Elizaveta Lomteva <57...@users.noreply.github.com>
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 5 +
.../src/main/resources/beam/suppressions.xml | 1 +
sdks/java/io/cdap/OWNERS | 1 +
sdks/java/io/cdap/build.gradle | 52 ++++++++
.../org/apache/beam/sdk/io/cdap/ConfigWrapper.java | 88 +++++++++++++
.../io/cdap/PluginConfigInstantiationUtils.java | 98 +++++++++++++++
.../org/apache/beam/sdk/io/cdap/package-info.java | 24 ++++
.../apache/beam/sdk/io/cdap/ConfigWrapperTest.java | 139 +++++++++++++++++++++
.../cdap/PluginConfigInstantiationUtilsTest.java | 107 ++++++++++++++++
.../test/resources/service_now_test_params.json | 11 ++
settings.gradle.kts | 1 +
11 files changed, 527 insertions(+)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bfc8b585af0..8f9d2336336 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -451,6 +451,7 @@ class BeamModulePlugin implements Plugin<Project> {
def aws_java_sdk_version = "1.12.135"
def aws_java_sdk2_version = "2.17.127"
def cassandra_driver_version = "3.10.2"
+ def cdap_version = "6.5.1"
def checkerframework_version = "3.10.0"
def classgraph_version = "4.8.104"
def errorprone_version = "2.10.0"
@@ -536,6 +537,10 @@ class BeamModulePlugin implements Plugin<Project> {
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
cassandra_driver_mapping : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
+ cdap_api : "io.cdap.cdap:cdap-api:$cdap_version",
+ cdap_common : "io.cdap.cdap:cdap-common:$cdap_version",
+ cdap_etl_api : "io.cdap.cdap:cdap-etl-api:$cdap_version",
+ cdap_plugin_service_now : "io.cdap.plugin:servicenow-plugins:1.1.0",
checker_qual : "org.checkerframework:checker-qual:$checkerframework_version",
classgraph : "io.github.classgraph:classgraph:$classgraph_version",
commons_codec : "commons-codec:commons-codec:1.15",
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 63e15e0199d..c050bc5540a 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -89,6 +89,7 @@
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*BeamZetaSqlCatalog\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*BufferingPullSubscriberTest\.java" />
+ <suppress id="ForbidNonVendoredGuava" files=".*cdap.*PluginConfigInstantiationUtils\.java" />
<!-- gRPC/protobuf exceptions -->
<!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
diff --git a/sdks/java/io/cdap/OWNERS b/sdks/java/io/cdap/OWNERS
new file mode 100644
index 00000000000..e60eec69932
--- /dev/null
+++ b/sdks/java/io/cdap/OWNERS
@@ -0,0 +1 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
new file mode 100644
index 00000000000..4ef361924d7
--- /dev/null
+++ b/sdks/java/io/cdap/build.gradle
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+ id 'java'
+ id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+ exportJavadoc: false,
+ automaticModuleName: 'org.apache.beam.sdk.io.cdap',
+)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: CDAP :: Java"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for integration with CDAP plugins."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+
+dependencies {
+ implementation library.java.guava
+ implementation library.java.cdap_api
+ implementation library.java.cdap_common
+ implementation library.java.jackson_core
+ implementation library.java.jackson_databind
+ implementation library.java.slf4j_api
+ implementation project(path: ":sdks:java:core", configuration: "shadow")
+ testImplementation library.java.cdap_plugin_service_now
+ testImplementation library.java.cdap_etl_api
+ testImplementation library.java.vendored_guava_26_0_jre
+ testImplementation library.java.junit
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
new file mode 100644
index 00000000000..9a2124e21b4
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for building {@link PluginConfig} object of the specific class {@param <T>}. */
+public class ConfigWrapper<T extends PluginConfig> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigWrapper.class);
+
+ @Nullable private Map<String, Object> paramsMap = null;
+ private final Class<T> configClass;
+
+ public ConfigWrapper(Class<T> configClass) {
+ this.configClass = configClass;
+ }
+
+ public ConfigWrapper<T> fromJsonString(String jsonString) throws IOException {
+ TypeReference<HashMap<String, Object>> typeRef =
+ new TypeReference<HashMap<String, Object>>() {};
+ try {
+ paramsMap = new ObjectMapper().readValue(jsonString, typeRef);
+ } catch (IOException e) {
+ LOG.error("Can not read json string to params map", e);
+ throw e;
+ }
+ return this;
+ }
+
+ public ConfigWrapper<T> fromJsonFile(File jsonFile) throws IOException {
+ TypeReference<HashMap<String, Object>> typeRef =
+ new TypeReference<HashMap<String, Object>>() {};
+ try {
+ paramsMap = new ObjectMapper().readValue(jsonFile, typeRef);
+ } catch (IOException e) {
+ LOG.error("Can not read json file to params map", e);
+ throw e;
+ }
+ return this;
+ }
+
+ public ConfigWrapper<T> withParams(Map<String, Object> paramsMap) {
+ this.paramsMap = new HashMap<>(paramsMap);
+ return this;
+ }
+
+ public ConfigWrapper<T> setParam(String paramName, Object param) {
+ getParamsMap().put(paramName, param);
+ return this;
+ }
+
+ public @Nullable T build() {
+ return PluginConfigInstantiationUtils.getPluginConfig(getParamsMap(), configClass);
+ }
+
+ private @Nonnull Map<String, Object> getParamsMap() {
+ if (paramsMap == null) {
+ paramsMap = new HashMap<>();
+ }
+ return paramsMap;
+ }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
new file mode 100644
index 00000000000..f13a3e7f7e9
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for getting any filled {@link PluginConfig} configuration object. */
+@SuppressWarnings({"assignment.type.incompatible", "UnstableApiUsage", "return.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+ private static final String MACRO_FIELDS_FIELD_NAME = "macroFields";
+
+ /**
+ * Method for instantiating {@link PluginConfig} object of specific class {@param configClass}.
+ * After instantiating, it will go over all {@link Field}s with the {@link Name} annotation and
+ * set the appropriate parameter values from the {@param params} map for them.
+ *
+ * @param params map of config fields, where key is the name of the field, value must be String or
+ * boxed primitive
+ * @return Config object for given map of arguments and configuration class
+ */
+ static @Nullable <T extends PluginConfig> T getPluginConfig(
+ Map<String, Object> params, Class<T> configClass) {
+ // Validate configClass
+ if (configClass == null) {
+ throw new IllegalArgumentException("Config class must be not null!");
+ }
+ List<Field> allFields = new ArrayList<>();
+ Class<?> currClass = configClass;
+ while (currClass != null && !currClass.equals(Object.class)) {
+ allFields.addAll(
+ Arrays.stream(currClass.getDeclaredFields())
+ .filter(f -> !Modifier.isStatic(f.getModifiers()))
+ .collect(Collectors.toList()));
+ currClass = currClass.getSuperclass();
+ }
+ InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+
+ T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
+
+ if (config != null) {
+ for (Field field : allFields) {
+ field.setAccessible(true);
+
+ Class<?> fieldType = field.getType();
+
+ Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+ Object fieldValue =
+ declaredAnnotation != null ? params.get(declaredAnnotation.value()) : null;
+
+ if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+ try {
+ field.set(config, fieldValue);
+ } catch (IllegalAccessException e) {
+ LOG.error("Can not set a field with value {}", fieldValue);
+ }
+ } else if (field.getName().equals(MACRO_FIELDS_FIELD_NAME)) {
+ try {
+ field.set(config, Collections.emptySet());
+ } catch (IllegalAccessException e) {
+ LOG.error("Can not set macro fields");
+ }
+ }
+ }
+ }
+ return config;
+ }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java
new file mode 100644
index 00000000000..862b13a856f
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from CDAP. */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.sdk.io.cdap;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java
new file mode 100644
index 00000000000..55f3ab49050
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import java.io.File;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test class for {@link ConfigWrapper}. */
+@RunWith(JUnit4.class)
+public class ConfigWrapperTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigWrapperTest.class);
+
+ private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+ ImmutableMap.<String, java.lang.Object>builder()
+ .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+ .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+ .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+ .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+ .put(ServiceNowConstants.PROPERTY_USER, "user")
+ .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+ .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+ .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+ .put("referenceName", "oldReference")
+ .build();
+
+ private static final String TEST_SERVICE_NOW_PARAMS_JSON_STRING =
+ "{\n"
+ + "\"clientId\": \"clientId\",\n"
+ + "\"clientSecret\": \"clientSecret\",\n"
+ + "\"restApiEndpoint\": \"https://www.google.com\",\n"
+ + "\"queryMode\": \"Table\",\n"
+ + "\"user\": \"user\",\n"
+ + "\"password\": \"password\",\n"
+ + "\"tableName\": \"tableName\",\n"
+ + "\"valueType\": \"Actual\",\n"
+ + "\"referenceName\": \"oldReference\"\n"
+ + "}";
+ private static final String SERVICE_NOW_TEST_PARAMS_JSON =
+ "src/test/resources/service_now_test_params.json";
+ public static final String REFERENCE_NAME_PARAM_NAME = "referenceName";
+
+ @Test
+ public void testBuildingPluginConfigFromParamsMap() {
+ try {
+ String newReferenceName = "new reference name";
+ ServiceNowSourceConfig config =
+ new ConfigWrapper<>(ServiceNowSourceConfig.class)
+ .withParams(TEST_SERVICE_NOW_PARAMS_MAP)
+ .setParam("referenceName", newReferenceName)
+ .build();
+ assertNotNull(config);
+ validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+ assertEquals(newReferenceName, config.referenceName);
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the config object", e);
+ fail();
+ }
+ }
+
+ @Test
+ public void testBuildingPluginConfigFromJsonFile() {
+ try {
+ String newReferenceName = "new reference name";
+ ServiceNowSourceConfig config =
+ new ConfigWrapper<>(ServiceNowSourceConfig.class)
+ .fromJsonFile(new File(SERVICE_NOW_TEST_PARAMS_JSON))
+ .setParam(REFERENCE_NAME_PARAM_NAME, newReferenceName)
+ .build();
+ assertNotNull(config);
+ validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+ assertEquals(newReferenceName, config.referenceName);
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the config object", e);
+ fail();
+ }
+ }
+
+ @Test
+ public void testBuildingPluginConfigFromJsonString() {
+ try {
+ String newReferenceName = "new reference name";
+ ServiceNowSourceConfig config =
+ new ConfigWrapper<>(ServiceNowSourceConfig.class)
+ .fromJsonString(TEST_SERVICE_NOW_PARAMS_JSON_STRING)
+ .setParam(REFERENCE_NAME_PARAM_NAME, newReferenceName)
+ .build();
+ assertNotNull(config);
+ validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+ assertEquals(newReferenceName, config.referenceName);
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the config object", e);
+ fail();
+ }
+ }
+
+ private static void validateServiceNowConfigObject(
+ Map<String, Object> params, ServiceNowSourceConfig config) {
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_ID), config.getClientId());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_SECRET), config.getClientSecret());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_API_ENDPOINT), config.getRestApiEndpoint());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_QUERY_MODE), config.getQueryMode().getValue());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_USER), config.getUser());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_PASSWORD), config.getPassword());
+ assertNotNull(config.getValueType());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_VALUE_TYPE), config.getValueType().getValueType());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_TABLE_NAME), config.getTableName());
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java
new file mode 100644
index 00000000000..90f4e498806
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test class for {@link PluginConfigInstantiationUtils}. */
+@RunWith(JUnit4.class)
+public class PluginConfigInstantiationUtilsTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PluginConfigInstantiationUtilsTest.class);
+
+ private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+ ImmutableMap.<String, java.lang.Object>builder()
+ .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+ .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+ .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+ .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+ .put(ServiceNowConstants.PROPERTY_USER, "user")
+ .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+ .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+ .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+ .put("referenceName", "oldReference")
+ .build();
+
+ @Test
+ public void testBuildingPluginConfigFromParamsMap() {
+ try {
+ ServiceNowSourceConfig config =
+ PluginConfigInstantiationUtils.getPluginConfig(
+ TEST_SERVICE_NOW_PARAMS_MAP, ServiceNowSourceConfig.class);
+ assertNotNull(config);
+ validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the config object", e);
+ fail();
+ }
+ }
+
+ @Test
+ public void testBuildingPluginConfigFromEmptyParamsMap() {
+ try {
+ ServiceNowSourceConfig config =
+ PluginConfigInstantiationUtils.getPluginConfig(
+ new HashMap<>(), ServiceNowSourceConfig.class);
+ assertNotNull(config);
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the config object", e);
+ fail();
+ }
+ }
+
+ @Test
+ public void testBuildingPluginConfigFromNullClassFail() {
+ try {
+ PluginConfigInstantiationUtils.getPluginConfig(TEST_SERVICE_NOW_PARAMS_MAP, null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ assertEquals("Config class must be not null!", e.getMessage());
+ }
+ }
+
+ private static void validateServiceNowConfigObject(
+ Map<String, Object> params, ServiceNowSourceConfig config) {
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_ID), config.getClientId());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_SECRET), config.getClientSecret());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_API_ENDPOINT), config.getRestApiEndpoint());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_QUERY_MODE), config.getQueryMode().getValue());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_USER), config.getUser());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_PASSWORD), config.getPassword());
+ assertNotNull(config.getValueType());
+ assertEquals(
+ params.get(ServiceNowConstants.PROPERTY_VALUE_TYPE), config.getValueType().getValueType());
+ assertEquals(params.get(ServiceNowConstants.PROPERTY_TABLE_NAME), config.getTableName());
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/resources/service_now_test_params.json b/sdks/java/io/cdap/src/test/resources/service_now_test_params.json
new file mode 100644
index 00000000000..96bbdc7383f
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/resources/service_now_test_params.json
@@ -0,0 +1,11 @@
+{
+ "clientId": "clientId",
+ "clientSecret": "clientSecret",
+ "restApiEndpoint": "https://www.google.com",
+ "queryMode": "Table",
+ "user": "user",
+ "password": "password",
+ "tableName": "tableName",
+ "valueType": "Actual",
+ "referenceName": "oldReference"
+}
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 305d0af786f..7889afe0526 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -165,6 +165,7 @@ include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
include(":sdks:java:io:expansion-service")
include(":sdks:java:io:file-based-io-tests")
include(":sdks:java:io:bigquery-io-perf-tests")
+include(":sdks:java:io:cdap")
include(":sdks:java:io:google-cloud-platform")
include(":sdks:java:io:google-cloud-platform:expansion-service")
include(":sdks:java:io:hadoop-common")