You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/05 02:08:41 UTC

[beam] branch master updated: [BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e7d3e8c9814 [BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)
e7d3e8c9814 is described below

commit e7d3e8c9814c8a9fec647a48818edaeca94dcb42
Author: Vitaly Terentyev <vi...@akvelon.com>
AuthorDate: Thu May 5 05:08:28 2022 +0300

    [BEAM-14048] [CdapIO] Add ConfigWrapper for building CDAP PluginConfigs (#17051)
    
    * [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
    
    * [BEAM-14048] Fix checkstyle
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Remove unused dependencies
    
    * [BEAM-14048] Add needed dependencies
    
    * [BEAM-14048] Fix spotless
    
    * [BEAM-14048] Fix typo
    
    * [BEAM-14048] Use fori instead of stream
    
    * [BEAM-14048] Suppress warning
    
    * [BEAM-14048] Add used undeclared artifacts
    
    * [BEAM-14048] Change dependencies to test
    
    * [BEAM-14048] Refactoring
    
    * [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
    
    * [BEAM-14048] Suppress warning
    
    * Update maven repo
    
    * Update build.gradle
    
    * [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
    
    * [BEAM-14048] Set macroFields
    
    Co-authored-by: Alex Kosolapov <al...@gmail.com>
    Co-authored-by: Elizaveta Lomteva <el...@akvelon.com>
    Co-authored-by: Elizaveta Lomteva <57...@users.noreply.github.com>
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   5 +
 .../src/main/resources/beam/suppressions.xml       |   1 +
 sdks/java/io/cdap/OWNERS                           |   1 +
 sdks/java/io/cdap/build.gradle                     |  52 ++++++++
 .../org/apache/beam/sdk/io/cdap/ConfigWrapper.java |  88 +++++++++++++
 .../io/cdap/PluginConfigInstantiationUtils.java    |  98 +++++++++++++++
 .../org/apache/beam/sdk/io/cdap/package-info.java  |  24 ++++
 .../apache/beam/sdk/io/cdap/ConfigWrapperTest.java | 139 +++++++++++++++++++++
 .../cdap/PluginConfigInstantiationUtilsTest.java   | 107 ++++++++++++++++
 .../test/resources/service_now_test_params.json    |  11 ++
 settings.gradle.kts                                |   1 +
 11 files changed, 527 insertions(+)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index bfc8b585af0..8f9d2336336 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -451,6 +451,7 @@ class BeamModulePlugin implements Plugin<Project> {
     def aws_java_sdk_version = "1.12.135"
     def aws_java_sdk2_version = "2.17.127"
     def cassandra_driver_version = "3.10.2"
+    def cdap_version = "6.5.1"
     def checkerframework_version = "3.10.0"
     def classgraph_version = "4.8.104"
     def errorprone_version = "2.10.0"
@@ -536,6 +537,10 @@ class BeamModulePlugin implements Plugin<Project> {
         bigdataoss_util                             : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
         cassandra_driver_core                       : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
         cassandra_driver_mapping                    : "com.datastax.cassandra:cassandra-driver-mapping:$cassandra_driver_version",
+        cdap_api                                    : "io.cdap.cdap:cdap-api:$cdap_version",
+        cdap_common                                 : "io.cdap.cdap:cdap-common:$cdap_version",
+        cdap_etl_api                                : "io.cdap.cdap:cdap-etl-api:$cdap_version",
+        cdap_plugin_service_now                     : "io.cdap.plugin:servicenow-plugins:1.1.0",
         checker_qual                                : "org.checkerframework:checker-qual:$checkerframework_version",
         classgraph                                  : "io.github.classgraph:classgraph:$classgraph_version",
         commons_codec                               : "commons-codec:commons-codec:1.15",
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index 63e15e0199d..c050bc5540a 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -89,6 +89,7 @@
   <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*BeamZetaSqlCatalog\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*BufferingPullSubscriberTest\.java" />
+  <suppress id="ForbidNonVendoredGuava" files=".*cdap.*PluginConfigInstantiationUtils\.java" />
 
   <!-- gRPC/protobuf exceptions -->
   <!-- Non-vendored gRPC/protobuf imports are allowed for files that depend on libraries that expose gRPC/protobuf in its public API -->
diff --git a/sdks/java/io/cdap/OWNERS b/sdks/java/io/cdap/OWNERS
new file mode 100644
index 00000000000..e60eec69932
--- /dev/null
+++ b/sdks/java/io/cdap/OWNERS
@@ -0,0 +1 @@
+# See the OWNERS docs at https://s.apache.org/beam-owners
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
new file mode 100644
index 00000000000..4ef361924d7
--- /dev/null
+++ b/sdks/java/io/cdap/build.gradle
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+    id 'java'
+    id 'org.apache.beam.module'
+}
+
+applyJavaNature(
+        exportJavadoc: false,
+        automaticModuleName: 'org.apache.beam.sdk.io.cdap',
+)
+provideIntegrationTestingDependencies()
+enableJavaPerformanceTesting()
+
+description = "Apache Beam :: CDAP :: Java"
+ext.summary = """Apache Beam SDK provides a simple, Java-based
+interface for integration with CDAP plugins."""
+
+/** Define the list of runners which execute a precommit test.
+ * Some runners are run from separate projects, see the preCommit task below
+ * for details.
+ */
+
+dependencies {
+    implementation library.java.guava
+    implementation library.java.cdap_api
+    implementation library.java.cdap_common
+    implementation library.java.jackson_core
+    implementation library.java.jackson_databind
+    implementation library.java.slf4j_api
+    implementation project(path: ":sdks:java:core", configuration: "shadow")
+    testImplementation library.java.cdap_plugin_service_now
+    testImplementation library.java.cdap_etl_api
+    testImplementation library.java.vendored_guava_26_0_jre
+    testImplementation library.java.junit
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
new file mode 100644
index 00000000000..9a2124e21b4
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/ConfigWrapper.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for building {@link PluginConfig} object of the specific class {@param <T>}. */
+public class ConfigWrapper<T extends PluginConfig> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigWrapper.class);
+
+  @Nullable private Map<String, Object> paramsMap = null;
+  private final Class<T> configClass;
+
+  public ConfigWrapper(Class<T> configClass) {
+    this.configClass = configClass;
+  }
+
+  public ConfigWrapper<T> fromJsonString(String jsonString) throws IOException {
+    TypeReference<HashMap<String, Object>> typeRef =
+        new TypeReference<HashMap<String, Object>>() {};
+    try {
+      paramsMap = new ObjectMapper().readValue(jsonString, typeRef);
+    } catch (IOException e) {
+      LOG.error("Can not read json string to params map", e);
+      throw e;
+    }
+    return this;
+  }
+
+  public ConfigWrapper<T> fromJsonFile(File jsonFile) throws IOException {
+    TypeReference<HashMap<String, Object>> typeRef =
+        new TypeReference<HashMap<String, Object>>() {};
+    try {
+      paramsMap = new ObjectMapper().readValue(jsonFile, typeRef);
+    } catch (IOException e) {
+      LOG.error("Can not read json file to params map", e);
+      throw e;
+    }
+    return this;
+  }
+
+  public ConfigWrapper<T> withParams(Map<String, Object> paramsMap) {
+    this.paramsMap = new HashMap<>(paramsMap);
+    return this;
+  }
+
+  public ConfigWrapper<T> setParam(String paramName, Object param) {
+    getParamsMap().put(paramName, param);
+    return this;
+  }
+
+  public @Nullable T build() {
+    return PluginConfigInstantiationUtils.getPluginConfig(getParamsMap(), configClass);
+  }
+
+  private @Nonnull Map<String, Object> getParamsMap() {
+    if (paramsMap == null) {
+      paramsMap = new HashMap<>();
+    }
+    return paramsMap;
+  }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
new file mode 100644
index 00000000000..f13a3e7f7e9
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class for getting any filled {@link PluginConfig} configuration object. */
+@SuppressWarnings({"assignment.type.incompatible", "UnstableApiUsage", "return.type.incompatible"})
+public class PluginConfigInstantiationUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PluginConfigInstantiationUtils.class);
+  private static final String MACRO_FIELDS_FIELD_NAME = "macroFields";
+
+  /**
+   * Method for instantiating {@link PluginConfig} object of specific class {@param configClass}.
+   * After instantiating, it will go over all {@link Field}s with the {@link Name} annotation and
+   * set the appropriate parameter values from the {@param params} map for them.
+   *
+   * @param params map of config fields, where key is the name of the field, value must be String or
+   *     boxed primitive
+   * @return Config object for given map of arguments and configuration class
+   */
+  static @Nullable <T extends PluginConfig> T getPluginConfig(
+      Map<String, Object> params, Class<T> configClass) {
+    // Validate configClass
+    if (configClass == null) {
+      throw new IllegalArgumentException("Config class must be not null!");
+    }
+    List<Field> allFields = new ArrayList<>();
+    Class<?> currClass = configClass;
+    while (currClass != null && !currClass.equals(Object.class)) {
+      allFields.addAll(
+          Arrays.stream(currClass.getDeclaredFields())
+              .filter(f -> !Modifier.isStatic(f.getModifiers()))
+              .collect(Collectors.toList()));
+      currClass = currClass.getSuperclass();
+    }
+    InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+
+    T config = instantiatorFactory.get(TypeToken.of(configClass)).create();
+
+    if (config != null) {
+      for (Field field : allFields) {
+        field.setAccessible(true);
+
+        Class<?> fieldType = field.getType();
+
+        Name declaredAnnotation = field.getDeclaredAnnotation(Name.class);
+        Object fieldValue =
+            declaredAnnotation != null ? params.get(declaredAnnotation.value()) : null;
+
+        if (fieldValue != null && fieldType.equals(fieldValue.getClass())) {
+          try {
+            field.set(config, fieldValue);
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set a field with value {}", fieldValue);
+          }
+        } else if (field.getName().equals(MACRO_FIELDS_FIELD_NAME)) {
+          try {
+            field.set(config, Collections.emptySet());
+          } catch (IllegalAccessException e) {
+            LOG.error("Can not set macro fields");
+          }
+        }
+      }
+    }
+    return config;
+  }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java
new file mode 100644
index 00000000000..862b13a856f
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Transforms for reading and writing from CDAP. */
+@Experimental(Kind.SOURCE_SINK)
+package org.apache.beam.sdk.io.cdap;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java
new file mode 100644
index 00000000000..55f3ab49050
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/ConfigWrapperTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import java.io.File;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test class for {@link ConfigWrapper}. */
+@RunWith(JUnit4.class)
+public class ConfigWrapperTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ConfigWrapperTest.class);
+
+  private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+      ImmutableMap.<String, java.lang.Object>builder()
+          .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+          .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+          .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+          .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+          .put(ServiceNowConstants.PROPERTY_USER, "user")
+          .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+          .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+          .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+          .put("referenceName", "oldReference")
+          .build();
+
+  private static final String TEST_SERVICE_NOW_PARAMS_JSON_STRING =
+      "{\n"
+          + "\"clientId\": \"clientId\",\n"
+          + "\"clientSecret\": \"clientSecret\",\n"
+          + "\"restApiEndpoint\": \"https://www.google.com\",\n"
+          + "\"queryMode\": \"Table\",\n"
+          + "\"user\": \"user\",\n"
+          + "\"password\": \"password\",\n"
+          + "\"tableName\": \"tableName\",\n"
+          + "\"valueType\": \"Actual\",\n"
+          + "\"referenceName\": \"oldReference\"\n"
+          + "}";
+  private static final String SERVICE_NOW_TEST_PARAMS_JSON =
+      "src/test/resources/service_now_test_params.json";
+  public static final String REFERENCE_NAME_PARAM_NAME = "referenceName";
+
+  @Test
+  public void testBuildingPluginConfigFromParamsMap() {
+    try {
+      String newReferenceName = "new reference name";
+      ServiceNowSourceConfig config =
+          new ConfigWrapper<>(ServiceNowSourceConfig.class)
+              .withParams(TEST_SERVICE_NOW_PARAMS_MAP)
+              .setParam("referenceName", newReferenceName)
+              .build();
+      assertNotNull(config);
+      validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+      assertEquals(newReferenceName, config.referenceName);
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the config object", e);
+      fail();
+    }
+  }
+
+  @Test
+  public void testBuildingPluginConfigFromJsonFile() {
+    try {
+      String newReferenceName = "new reference name";
+      ServiceNowSourceConfig config =
+          new ConfigWrapper<>(ServiceNowSourceConfig.class)
+              .fromJsonFile(new File(SERVICE_NOW_TEST_PARAMS_JSON))
+              .setParam(REFERENCE_NAME_PARAM_NAME, newReferenceName)
+              .build();
+      assertNotNull(config);
+      validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+      assertEquals(newReferenceName, config.referenceName);
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the config object", e);
+      fail();
+    }
+  }
+
+  @Test
+  public void testBuildingPluginConfigFromJsonString() {
+    try {
+      String newReferenceName = "new reference name";
+      ServiceNowSourceConfig config =
+          new ConfigWrapper<>(ServiceNowSourceConfig.class)
+              .fromJsonString(TEST_SERVICE_NOW_PARAMS_JSON_STRING)
+              .setParam(REFERENCE_NAME_PARAM_NAME, newReferenceName)
+              .build();
+      assertNotNull(config);
+      validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+      assertEquals(newReferenceName, config.referenceName);
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the config object", e);
+      fail();
+    }
+  }
+
+  private static void validateServiceNowConfigObject(
+      Map<String, Object> params, ServiceNowSourceConfig config) {
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_ID), config.getClientId());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_SECRET), config.getClientSecret());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_API_ENDPOINT), config.getRestApiEndpoint());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_QUERY_MODE), config.getQueryMode().getValue());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_USER), config.getUser());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_PASSWORD), config.getPassword());
+    assertNotNull(config.getValueType());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_VALUE_TYPE), config.getValueType().getValueType());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_TABLE_NAME), config.getTableName());
+  }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java
new file mode 100644
index 00000000000..90f4e498806
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginConfigInstantiationUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test class for {@link PluginConfigInstantiationUtils}. */
+@RunWith(JUnit4.class)
+public class PluginConfigInstantiationUtilsTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PluginConfigInstantiationUtilsTest.class);
+
+  private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+      ImmutableMap.<String, java.lang.Object>builder()
+          .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+          .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+          .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+          .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+          .put(ServiceNowConstants.PROPERTY_USER, "user")
+          .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+          .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+          .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+          .put("referenceName", "oldReference")
+          .build();
+
+  @Test
+  public void testBuildingPluginConfigFromParamsMap() {
+    try {
+      ServiceNowSourceConfig config =
+          PluginConfigInstantiationUtils.getPluginConfig(
+              TEST_SERVICE_NOW_PARAMS_MAP, ServiceNowSourceConfig.class);
+      assertNotNull(config);
+      validateServiceNowConfigObject(TEST_SERVICE_NOW_PARAMS_MAP, config);
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the config object", e);
+      fail();
+    }
+  }
+
+  @Test
+  public void testBuildingPluginConfigFromEmptyParamsMap() {
+    try {
+      ServiceNowSourceConfig config =
+          PluginConfigInstantiationUtils.getPluginConfig(
+              new HashMap<>(), ServiceNowSourceConfig.class);
+      assertNotNull(config);
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the config object", e);
+      fail();
+    }
+  }
+
+  @Test
+  public void testBuildingPluginConfigFromNullClassFail() {
+    try {
+      PluginConfigInstantiationUtils.getPluginConfig(TEST_SERVICE_NOW_PARAMS_MAP, null);
+      fail();
+    } catch (IllegalArgumentException e) {
+      assertEquals("Config class must be not null!", e.getMessage());
+    }
+  }
+
+  private static void validateServiceNowConfigObject(
+      Map<String, Object> params, ServiceNowSourceConfig config) {
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_ID), config.getClientId());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_CLIENT_SECRET), config.getClientSecret());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_API_ENDPOINT), config.getRestApiEndpoint());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_QUERY_MODE), config.getQueryMode().getValue());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_USER), config.getUser());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_PASSWORD), config.getPassword());
+    assertNotNull(config.getValueType());
+    assertEquals(
+        params.get(ServiceNowConstants.PROPERTY_VALUE_TYPE), config.getValueType().getValueType());
+    assertEquals(params.get(ServiceNowConstants.PROPERTY_TABLE_NAME), config.getTableName());
+  }
+}
diff --git a/sdks/java/io/cdap/src/test/resources/service_now_test_params.json b/sdks/java/io/cdap/src/test/resources/service_now_test_params.json
new file mode 100644
index 00000000000..96bbdc7383f
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/resources/service_now_test_params.json
@@ -0,0 +1,11 @@
+{
+  "clientId": "clientId",
+  "clientSecret": "clientSecret",
+  "restApiEndpoint": "https://www.google.com",
+  "queryMode": "Table",
+  "user": "user",
+  "password": "password",
+  "tableName": "tableName",
+  "valueType": "Actual",
+  "referenceName": "oldReference"
+}
\ No newline at end of file
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 305d0af786f..7889afe0526 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -165,6 +165,7 @@ include(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common")
 include(":sdks:java:io:expansion-service")
 include(":sdks:java:io:file-based-io-tests")
 include(":sdks:java:io:bigquery-io-perf-tests")
+include(":sdks:java:io:cdap")
 include(":sdks:java:io:google-cloud-platform")
 include(":sdks:java:io:google-cloud-platform:expansion-service")
 include(":sdks:java:io:hadoop-common")