You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/24 20:07:41 UTC

[beam] branch master updated: [BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7547cb0ea68 [BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)
7547cb0ea68 is described below

commit 7547cb0ea68143f9468089a7e9354f9fcfeed4a7
Author: Ekaterina Tatanova <63...@users.noreply.github.com>
AuthorDate: Tue May 24 23:07:33 2022 +0300

    [BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)
    
    * [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
    
    * [BEAM-14048] Fix checkstyle
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warnings
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Fix warning
    
    * [BEAM-14048] Remove unused dependencies
    
    * [BEAM-14048] Add needed dependencies
    
    * [BEAM-14048] Fix spotless
    
    * [BEAM-14048] Fix typo
    
    * [BEAM-14048] Use fori instead of stream
    
    * [BEAM-14048] Suppress warning
    
    * [BEAM-14048] Add used undeclared artifacts
    
    * [BEAM-14048] Change dependencies to test
    
    * Add context.
    
    * Fix dependencies issue
    
    * Add null annotation
    
    * Added CDAP plugin wrapper
    
    * [BEAM-14048] Refactoring
    
    * Add SuppressWarning.
    
    * Fix style.
    
    * Determine dependencies.
    
    * Added JavaDoc for PluginConstants class
    
    * Formatted code
    
    * Refactored dependencies
    
    * Made plugin fields nullable
    
    * Refactored getters
    
    * retrigger checks
    
    * [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
    
    * [BEAM-14048] Suppress warning
    
    * [BEAM-14081] Refactoring
    
    * Update maven repo
    
    * Update build.gradle
    
    * [BEAM-14053] Add prepareRun method in Plugin
    
    * [BEAM-14081] Refactoring
    
    * [BEAM-14053] Add Nullable annotations
    
    * [BEAM-14053] Suppress warning
    
    * [BEAM-14053] Fix Spotless
    
    * [BEAM-14053] Add avro dependency
    
    * [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
    
    * [BEAM-14048] Set macroFields
    
    * [BEAM-14081] Fix javadoc
    
    * [BEAM-14053] Use ServiceNow dependency in tests
    
    * Added hadoop_common dependency
    
    * used getDeclaredAnnotationsByType method
    
    Co-authored-by: vitaly.terentyev <vi...@akvelon.com>
    Co-authored-by: igor.krasavin <ig...@akvelon.com>
    Co-authored-by: Alex Kosolapov <al...@gmail.com>
    Co-authored-by: Elizaveta Lomteva <el...@akvelon.com>
    Co-authored-by: Elizaveta Lomteva <57...@users.noreply.github.com>
---
 .../src/main/resources/beam/suppressions.xml       |   1 +
 sdks/java/io/cdap/build.gradle                     |   4 +
 .../java/org/apache/beam/sdk/io/cdap/Plugin.java   | 233 +++++++++++++++++++++
 .../apache/beam/sdk/io/cdap/PluginConstants.java   | 107 ++++++++++
 .../org/apache/beam/sdk/io/cdap/PluginTest.java    | 119 +++++++++++
 5 files changed, 464 insertions(+)

diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index c050bc5540a..d8741025d80 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -89,6 +89,7 @@
   <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*zetasql.*BeamZetaSqlCatalog\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*BufferingPullSubscriberTest\.java" />
+  <suppress id="ForbidNonVendoredGuava" files=".*cdap.*Plugin\.java" />
   <suppress id="ForbidNonVendoredGuava" files=".*cdap.*PluginConfigInstantiationUtils\.java" />
 
   <!-- gRPC/protobuf exceptions -->
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
index 9fe7d305a29..85ced547808 100644
--- a/sdks/java/io/cdap/build.gradle
+++ b/sdks/java/io/cdap/build.gradle
@@ -38,6 +38,7 @@ interface for integration with CDAP plugins."""
  */
 
 dependencies {
+    implementation library.java.avro
     implementation library.java.cdap_api
     implementation library.java.cdap_api_commons
     implementation (library.java.cdap_common) {
@@ -45,6 +46,8 @@ dependencies {
     }
     implementation library.java.cdap_etl_api
     implementation library.java.cdap_etl_api_spark
+    implementation library.java.hadoop_common
+    implementation library.java.hadoop_mapreduce_client_core
     implementation library.java.jackson_core
     implementation library.java.jackson_databind
     implementation library.java.guava
@@ -53,6 +56,7 @@ dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     testImplementation library.java.cdap_plugin_service_now
     testImplementation library.java.cdap_etl_api
+    testImplementation library.java.hadoop_common
     testImplementation library.java.vendored_guava_26_0_jre
     testImplementation library.java.junit
     testImplementation project(path: ":runners:direct-java", configuration: "shadow")
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
new file mode 100644
index 00000000000..f733ddcfdc7
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class wrapper for a CDAP plugin. */
+@AutoValue
+@SuppressWarnings({
+  "rawtypes",
+  "unchecked",
+  "assignment.type.incompatible",
+  "initialization.fields.uninitialized"
+})
+public abstract class Plugin {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
+
+  protected @Nullable PluginConfig pluginConfig;
+  protected @Nullable Configuration hadoopConfiguration;
+  protected @Nullable SubmitterLifecycle cdapPluginObj;
+
+  /** Gets the context of a plugin. */
+  public abstract BatchContextImpl getContext();
+
+  /** Gets the main class of a plugin. */
+  public abstract Class<?> getPluginClass();
+
+  /** Gets InputFormat or OutputFormat class for a plugin. */
+  public abstract Class<?> getFormatClass();
+
+  /** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
+  public abstract Class<?> getFormatProviderClass();
+
+  /** Sets a plugin config. */
+  public Plugin withConfig(PluginConfig pluginConfig) {
+    this.pluginConfig = pluginConfig;
+    return this;
+  }
+
+  /** Gets a plugin config. */
+  public PluginConfig getPluginConfig() {
+    return pluginConfig;
+  }
+
+  /**
+   * Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link #cdapPluginObj}
+   * passing needed {@param config} configuration object as a parameter. This method is needed for
+   * validating connection to the CDAP sink/source and performing initial tuning.
+   */
+  public void prepareRun() {
+    if (cdapPluginObj == null) {
+      InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+      cdapPluginObj =
+          (SubmitterLifecycle) instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+    }
+    try {
+      cdapPluginObj.prepareRun(getContext());
+      for (Map.Entry<String, String> entry :
+          getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
+        getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+      }
+    } catch (Exception e) {
+      LOG.error("Error while prepareRun", e);
+      throw new IllegalStateException("Error while prepareRun");
+    }
+  }
+
+  /** Sets a plugin Hadoop configuration. */
+  public Plugin withHadoopConfiguration(Class<?> formatKeyClass, Class<?> formatValueClass) {
+    PluginConstants.Format formatType = getFormatType();
+    PluginConstants.Hadoop hadoopType = getHadoopType();
+
+    getHadoopConfiguration()
+        .setClass(hadoopType.getFormatClass(), getFormatClass(), formatType.getFormatClass());
+    getHadoopConfiguration().setClass(hadoopType.getKeyClass(), formatKeyClass, Object.class);
+    getHadoopConfiguration().setClass(hadoopType.getValueClass(), formatValueClass, Object.class);
+
+    return this;
+  }
+
+  /** Sets a plugin Hadoop configuration. */
+  public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
+    this.hadoopConfiguration = hadoopConfiguration;
+
+    return this;
+  }
+
+  /** Gets a plugin Hadoop configuration. */
+  public Configuration getHadoopConfiguration() {
+    if (hadoopConfiguration == null) {
+      hadoopConfiguration = new Configuration(false);
+    }
+    return hadoopConfiguration;
+  }
+
+  /** Gets a plugin type. */
+  public abstract PluginConstants.PluginType getPluginType();
+
+  /** Gets a format type. */
+  private PluginConstants.Format getFormatType() {
+    return getPluginType() == PluginConstants.PluginType.SOURCE
+        ? PluginConstants.Format.INPUT
+        : PluginConstants.Format.OUTPUT;
+  }
+
+  /** Gets a Hadoop type. */
+  private PluginConstants.Hadoop getHadoopType() {
+    return getPluginType() == PluginConstants.PluginType.SOURCE
+        ? PluginConstants.Hadoop.SOURCE
+        : PluginConstants.Hadoop.SINK;
+  }
+
+  /** Gets value of a plugin type. */
+  public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
+      throws IllegalArgumentException {
+    if (BatchSource.class.isAssignableFrom(pluginClass)) {
+      return PluginConstants.PluginType.SOURCE;
+    } else if (BatchSink.class.isAssignableFrom(pluginClass)) {
+      return PluginConstants.PluginType.SINK;
+    } else {
+      throw new IllegalArgumentException("Provided class should be source or sink plugin");
+    }
+  }
+
+  public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
+    // Init context and determine input or output
+    Class<?> contextClass = null;
+    List<Method> methods = new ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
+    if (cdapPluginClass.getSuperclass() != null) {
+      methods.addAll(Arrays.asList(cdapPluginClass.getSuperclass().getDeclaredMethods()));
+    }
+    for (Method method : methods) {
+      if (method.getName().equals("prepareRun")) {
+        contextClass = method.getParameterTypes()[0];
+      }
+    }
+    if (contextClass == null) {
+      throw new IllegalStateException("Cannot determine context class");
+    }
+
+    if (contextClass.equals(BatchSourceContext.class)) {
+      return new BatchSourceContextImpl();
+    } else if (contextClass.equals(BatchSinkContext.class)) {
+      return new BatchSinkContextImpl();
+    } else {
+      return new BatchSourceContextImpl();
+    }
+  }
+
+  /** Gets value of a plugin type. */
+  public Boolean isUnbounded() {
+    Boolean isUnbounded = null;
+
+    for (io.cdap.cdap.api.annotation.Plugin annotation :
+        getPluginClass().getDeclaredAnnotationsByType(io.cdap.cdap.api.annotation.Plugin.class)) {
+      String pluginType = annotation.type();
+      isUnbounded = pluginType != null && pluginType.startsWith("streaming");
+    }
+    if (isUnbounded == null) {
+      throw new IllegalArgumentException("CDAP plugin class must have Plugin annotation!");
+    }
+    return isUnbounded;
+  }
+
+  /** Creates a plugin instance. */
+  public static Plugin create(
+      Class<?> newPluginClass, Class<?> newFormatClass, Class<?> newFormatProviderClass) {
+    return builder()
+        .setPluginClass(newPluginClass)
+        .setFormatClass(newFormatClass)
+        .setFormatProviderClass(newFormatProviderClass)
+        .setPluginType(Plugin.initPluginType(newPluginClass))
+        .setContext(Plugin.initContext(newPluginClass))
+        .build();
+  }
+
+  /** Creates a plugin builder instance. */
+  public static Builder builder() {
+    return new AutoValue_Plugin.Builder();
+  }
+
+  /** Builder class for a {@link Plugin}. */
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setPluginClass(Class<?> newPluginClass);
+
+    public abstract Builder setFormatClass(Class<?> newFormatClass);
+
+    public abstract Builder setFormatProviderClass(Class<?> newFormatProviderClass);
+
+    public abstract Builder setPluginType(PluginConstants.PluginType newPluginType);
+
+    public abstract Builder setContext(BatchContextImpl context);
+
+    public abstract Plugin build();
+  }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java
new file mode 100644
index 00000000000..01938ae5c99
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+import io.cdap.cdap.api.data.batch.OutputFormatProvider;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+/** Class for CDAP plugin constants. */
+public final class PluginConstants {
+  /** Plugin types. */
+  public enum PluginType {
+    SOURCE,
+    SINK
+  }
+
+  /** Format types. */
+  public enum Format {
+    INPUT(InputFormat.class, "InputFormat"),
+    OUTPUT(OutputFormat.class, "OutputFormat");
+
+    private final Class<?> formatClass;
+    private final String formatName;
+
+    Format(Class<?> formatClass, String formatName) {
+      this.formatClass = formatClass;
+      this.formatName = formatName;
+    }
+
+    public Class<?> getFormatClass() {
+      return formatClass;
+    }
+
+    public String getFormatName() {
+      return formatName;
+    }
+  }
+
+  /** Format provider types. */
+  public enum FormatProvider {
+    INPUT(InputFormatProvider.class, "InputFormatProvider"),
+    OUTPUT(OutputFormatProvider.class, "OutputFormatProvider");
+
+    private final Class<?> formatProviderClass;
+    private final String formatProviderName;
+
+    FormatProvider(Class<?> formatProviderClass, String formatProviderName) {
+      this.formatProviderClass = formatProviderClass;
+      this.formatProviderName = formatProviderName;
+    }
+
+    public Class<?> getFormatProviderClass() {
+      return formatProviderClass;
+    }
+
+    public String getFormatProviderName() {
+      return formatProviderName;
+    }
+  }
+
+  /** Hadoop types. */
+  public enum Hadoop {
+    SOURCE("mapreduce.job.inputformat.class", "key.class", "value.class"),
+    SINK(
+        "mapreduce.job.outputformat.class",
+        "mapreduce.job.output.key.class",
+        "mapreduce.job.output.value.class");
+
+    private final String formatClass;
+    private final String keyClass;
+    private final String valueClass;
+
+    Hadoop(String formatClassName, String keyClass, String valueClass) {
+      this.formatClass = formatClassName;
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+
+    public String getFormatClass() {
+      return formatClass;
+    }
+
+    public String getKeyClass() {
+      return keyClass;
+    }
+
+    public String getValueClass() {
+      return valueClass;
+    }
+  }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java
new file mode 100644
index 00000000000..501c91b6cda
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.io.MapWritable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class PluginTest {
+
+  private static final Logger LOG = LoggerFactory.getLogger(PluginTest.class);
+
+  private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+      ImmutableMap.<String, java.lang.Object>builder()
+          .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+          .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+          .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+          .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+          .put(ServiceNowConstants.PROPERTY_USER, "user")
+          .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+          .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+          .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+          .put("referenceName", "oldReference")
+          .build();
+
+  private static final String REFERENCE_NAME_PARAM_NAME = "referenceName";
+
+  /** Config for ServiceNow Batch Source plugin. */
+  public ServiceNowSourceConfig serviceNowSourceConfig =
+      new ConfigWrapper<>(ServiceNowSourceConfig.class)
+          .withParams(TEST_SERVICE_NOW_PARAMS_MAP)
+          .setParam(REFERENCE_NAME_PARAM_NAME, "some reference name")
+          .build();
+
+  @Test
+  public void testBuildingSourcePluginWithCDAPClasses() {
+    try {
+      Plugin serviceNowSourcePlugin =
+          Plugin.create(
+                  ServiceNowSource.class,
+                  ServiceNowInputFormat.class,
+                  SourceInputFormatProvider.class)
+              .withConfig(serviceNowSourceConfig)
+              .withHadoopConfiguration(Schema.class, MapWritable.class);
+
+      assertEquals(ServiceNowSource.class, serviceNowSourcePlugin.getPluginClass());
+      assertEquals(ServiceNowInputFormat.class, serviceNowSourcePlugin.getFormatClass());
+      assertEquals(
+          SourceInputFormatProvider.class, serviceNowSourcePlugin.getFormatProviderClass());
+      assertEquals(serviceNowSourceConfig, serviceNowSourcePlugin.getPluginConfig());
+      assertEquals(
+          ServiceNowInputFormat.class,
+          serviceNowSourcePlugin
+              .getHadoopConfiguration()
+              .getClass(
+                  PluginConstants.Hadoop.SOURCE.getFormatClass(),
+                  PluginConstants.Format.INPUT.getFormatClass()));
+    } catch (Exception e) {
+      LOG.error("Error occurred while building the ServiceNow Source Plugin", e);
+      fail();
+    }
+  }
+
+  @Test
+  public void testSettingPluginType() {
+    Plugin serviceNowSourcePlugin =
+        Plugin.create(
+                ServiceNowSource.class,
+                ServiceNowInputFormat.class,
+                SourceInputFormatProvider.class)
+            .withConfig(serviceNowSourceConfig)
+            .withHadoopConfiguration(Schema.class, MapWritable.class);
+
+    assertEquals(PluginConstants.PluginType.SOURCE, serviceNowSourcePlugin.getPluginType());
+  }
+
+  @Test
+  @SuppressWarnings("UnusedVariable")
+  public void testSettingPluginTypeFailed() {
+    try {
+      Plugin serviceNowSourcePlugin =
+          Plugin.create(Object.class, Object.class, Object.class)
+              .withConfig(serviceNowSourceConfig)
+              .withHadoopConfiguration(Schema.class, MapWritable.class);
+      fail("This should have thrown an exception");
+    } catch (Exception e) {
+      assertEquals("Provided class should be source or sink plugin", e.getMessage());
+    }
+  }
+}