You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/05/24 20:07:41 UTC
[beam] branch master updated: [BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7547cb0ea68 [BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)
7547cb0ea68 is described below
commit 7547cb0ea68143f9468089a7e9354f9fcfeed4a7
Author: Ekaterina Tatanova <63...@users.noreply.github.com>
AuthorDate: Tue May 24 23:07:33 2022 +0300
[BEAM-14053] [CdapIO] Add wrapper class for CDAP plugin (#17150)
* [BEAM-14048] Add ConfigWrapper for building CDAP PluginConfigs
* [BEAM-14048] Fix checkstyle
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warnings
* [BEAM-14048] Fix warning
* [BEAM-14048] Fix warning
* [BEAM-14048] Remove unused dependencies
* [BEAM-14048] Add needed dependencies
* [BEAM-14048] Fix spotless
* [BEAM-14048] Fix typo
* [BEAM-14048] Use fori instead of stream
* [BEAM-14048] Suppress warning
* [BEAM-14048] Add used undeclared artifacts
* [BEAM-14048] Change dependencies to test
* Add context.
* Fix dependencies issue
* Add null annotation
* Added CDAP plugin wrapper
* [BEAM-14048] Refactoring
* Add SuppressWarning.
* Fix style.
* Determine dependencies.
* Added JavaDoc for PluginConstants class
* Formatted code
* Refactored dependencies
* Made plugin fields nullable
* Refactored getters
* retrigger checks
* [BEAM-14048] Use CDAP InstantiatorFactory for creating config objects
* [BEAM-14048] Suppress warning
* [BEAM-14081] Refactoring
* Update maven repo
* Update build.gradle
* [BEAM-14053] Add prepareRun method in Plugin
* [BEAM-14081] Refactoring
* [BEAM-14053] Add Nullable annotations
* [BEAM-14053] Suppress warning
* [BEAM-14053] Fix Spotless
* [BEAM-14053] Add avro dependency
* [BEAM-14048] Use ServiceNow CDAP dependency from Maven central
* [BEAM-14048] Set macroFields
* [BEAM-14081] Fix javadoc
* [BEAM-14053] Use ServiceNow dependency in tests
* Added hadoop_common dependency
* used getDeclaredAnnotationsByType method
Co-authored-by: vitaly.terentyev <vi...@akvelon.com>
Co-authored-by: igor.krasavin <ig...@akvelon.com>
Co-authored-by: Alex Kosolapov <al...@gmail.com>
Co-authored-by: Elizaveta Lomteva <el...@akvelon.com>
Co-authored-by: Elizaveta Lomteva <57...@users.noreply.github.com>
---
.../src/main/resources/beam/suppressions.xml | 1 +
sdks/java/io/cdap/build.gradle | 4 +
.../java/org/apache/beam/sdk/io/cdap/Plugin.java | 233 +++++++++++++++++++++
.../apache/beam/sdk/io/cdap/PluginConstants.java | 107 ++++++++++
.../org/apache/beam/sdk/io/cdap/PluginTest.java | 119 +++++++++++
5 files changed, 464 insertions(+)
diff --git a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
index c050bc5540a..d8741025d80 100644
--- a/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/suppressions.xml
@@ -89,6 +89,7 @@
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*ExpressionConverter\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*zetasql.*BeamZetaSqlCatalog\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*pubsublite.*BufferingPullSubscriberTest\.java" />
+ <suppress id="ForbidNonVendoredGuava" files=".*cdap.*Plugin\.java" />
<suppress id="ForbidNonVendoredGuava" files=".*cdap.*PluginConfigInstantiationUtils\.java" />
<!-- gRPC/protobuf exceptions -->
diff --git a/sdks/java/io/cdap/build.gradle b/sdks/java/io/cdap/build.gradle
index 9fe7d305a29..85ced547808 100644
--- a/sdks/java/io/cdap/build.gradle
+++ b/sdks/java/io/cdap/build.gradle
@@ -38,6 +38,7 @@ interface for integration with CDAP plugins."""
*/
dependencies {
+ implementation library.java.avro
implementation library.java.cdap_api
implementation library.java.cdap_api_commons
implementation (library.java.cdap_common) {
@@ -45,6 +46,8 @@ dependencies {
}
implementation library.java.cdap_etl_api
implementation library.java.cdap_etl_api_spark
+ implementation library.java.hadoop_common
+ implementation library.java.hadoop_mapreduce_client_core
implementation library.java.jackson_core
implementation library.java.jackson_databind
implementation library.java.guava
@@ -53,6 +56,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
testImplementation library.java.cdap_plugin_service_now
testImplementation library.java.cdap_etl_api
+ testImplementation library.java.hadoop_common
testImplementation library.java.vendored_guava_26_0_jre
testImplementation library.java.junit
testImplementation project(path: ":runners:direct-java", configuration: "shadow")
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
new file mode 100644
index 00000000000..f733ddcfdc7
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/Plugin.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.common.lang.InstantiatorFactory;
+import io.cdap.cdap.etl.api.SubmitterLifecycle;
+import io.cdap.cdap.etl.api.batch.BatchSink;
+import io.cdap.cdap.etl.api.batch.BatchSinkContext;
+import io.cdap.cdap.etl.api.batch.BatchSource;
+import io.cdap.cdap.etl.api.batch.BatchSourceContext;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.reflect.Nullable;
+import org.apache.beam.sdk.io.cdap.context.BatchContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSinkContextImpl;
+import org.apache.beam.sdk.io.cdap.context.BatchSourceContextImpl;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Class wrapper for a CDAP plugin. */
+@AutoValue
+@SuppressWarnings({
+ "rawtypes",
+ "unchecked",
+ "assignment.type.incompatible",
+ "initialization.fields.uninitialized"
+})
+public abstract class Plugin {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Plugin.class);
+
+ protected @Nullable PluginConfig pluginConfig;
+ protected @Nullable Configuration hadoopConfiguration;
+ protected @Nullable SubmitterLifecycle cdapPluginObj;
+
+ /** Gets the context of a plugin. */
+ public abstract BatchContextImpl getContext();
+
+ /** Gets the main class of a plugin. */
+ public abstract Class<?> getPluginClass();
+
+ /** Gets InputFormat or OutputFormat class for a plugin. */
+ public abstract Class<?> getFormatClass();
+
+ /** Gets InputFormatProvider or OutputFormatProvider class for a plugin. */
+ public abstract Class<?> getFormatProviderClass();
+
+ /** Sets a plugin config. */
+ public Plugin withConfig(PluginConfig pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ return this;
+ }
+
+ /** Gets a plugin config. */
+ public PluginConfig getPluginConfig() {
+ return pluginConfig;
+ }
+
+ /**
+ * Calls {@link SubmitterLifecycle#prepareRun(Object)} method on the {@link #cdapPluginObj}
+ * passing needed {@param config} configuration object as a parameter. This method is needed for
+ * validating connection to the CDAP sink/source and performing initial tuning.
+ */
+ public void prepareRun() {
+ if (cdapPluginObj == null) {
+ InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
+ cdapPluginObj =
+ (SubmitterLifecycle) instantiatorFactory.get(TypeToken.of(getPluginClass())).create();
+ }
+ try {
+ cdapPluginObj.prepareRun(getContext());
+ for (Map.Entry<String, String> entry :
+ getContext().getInputFormatProvider().getInputFormatConfiguration().entrySet()) {
+ getHadoopConfiguration().set(entry.getKey(), entry.getValue());
+ }
+ } catch (Exception e) {
+ LOG.error("Error while prepareRun", e);
+ throw new IllegalStateException("Error while prepareRun");
+ }
+ }
+
+ /** Sets a plugin Hadoop configuration. */
+ public Plugin withHadoopConfiguration(Class<?> formatKeyClass, Class<?> formatValueClass) {
+ PluginConstants.Format formatType = getFormatType();
+ PluginConstants.Hadoop hadoopType = getHadoopType();
+
+ getHadoopConfiguration()
+ .setClass(hadoopType.getFormatClass(), getFormatClass(), formatType.getFormatClass());
+ getHadoopConfiguration().setClass(hadoopType.getKeyClass(), formatKeyClass, Object.class);
+ getHadoopConfiguration().setClass(hadoopType.getValueClass(), formatValueClass, Object.class);
+
+ return this;
+ }
+
+ /** Sets a plugin Hadoop configuration. */
+ public Plugin withHadoopConfiguration(Configuration hadoopConfiguration) {
+ this.hadoopConfiguration = hadoopConfiguration;
+
+ return this;
+ }
+
+ /** Gets a plugin Hadoop configuration. */
+ public Configuration getHadoopConfiguration() {
+ if (hadoopConfiguration == null) {
+ hadoopConfiguration = new Configuration(false);
+ }
+ return hadoopConfiguration;
+ }
+
+ /** Gets a plugin type. */
+ public abstract PluginConstants.PluginType getPluginType();
+
+ /** Gets a format type. */
+ private PluginConstants.Format getFormatType() {
+ return getPluginType() == PluginConstants.PluginType.SOURCE
+ ? PluginConstants.Format.INPUT
+ : PluginConstants.Format.OUTPUT;
+ }
+
+ /** Gets a Hadoop type. */
+ private PluginConstants.Hadoop getHadoopType() {
+ return getPluginType() == PluginConstants.PluginType.SOURCE
+ ? PluginConstants.Hadoop.SOURCE
+ : PluginConstants.Hadoop.SINK;
+ }
+
+ /** Gets value of a plugin type. */
+ public static PluginConstants.PluginType initPluginType(Class<?> pluginClass)
+ throws IllegalArgumentException {
+ if (BatchSource.class.isAssignableFrom(pluginClass)) {
+ return PluginConstants.PluginType.SOURCE;
+ } else if (BatchSink.class.isAssignableFrom(pluginClass)) {
+ return PluginConstants.PluginType.SINK;
+ } else {
+ throw new IllegalArgumentException("Provided class should be source or sink plugin");
+ }
+ }
+
+ public static BatchContextImpl initContext(Class<?> cdapPluginClass) {
+ // Init context and determine input or output
+ Class<?> contextClass = null;
+ List<Method> methods = new ArrayList<>(Arrays.asList(cdapPluginClass.getDeclaredMethods()));
+ if (cdapPluginClass.getSuperclass() != null) {
+ methods.addAll(Arrays.asList(cdapPluginClass.getSuperclass().getDeclaredMethods()));
+ }
+ for (Method method : methods) {
+ if (method.getName().equals("prepareRun")) {
+ contextClass = method.getParameterTypes()[0];
+ }
+ }
+ if (contextClass == null) {
+ throw new IllegalStateException("Cannot determine context class");
+ }
+
+ if (contextClass.equals(BatchSourceContext.class)) {
+ return new BatchSourceContextImpl();
+ } else if (contextClass.equals(BatchSinkContext.class)) {
+ return new BatchSinkContextImpl();
+ } else {
+ return new BatchSourceContextImpl();
+ }
+ }
+
+ /** Gets value of a plugin type. */
+ public Boolean isUnbounded() {
+ Boolean isUnbounded = null;
+
+ for (io.cdap.cdap.api.annotation.Plugin annotation :
+ getPluginClass().getDeclaredAnnotationsByType(io.cdap.cdap.api.annotation.Plugin.class)) {
+ String pluginType = annotation.type();
+ isUnbounded = pluginType != null && pluginType.startsWith("streaming");
+ }
+ if (isUnbounded == null) {
+ throw new IllegalArgumentException("CDAP plugin class must have Plugin annotation!");
+ }
+ return isUnbounded;
+ }
+
+ /** Creates a plugin instance. */
+ public static Plugin create(
+ Class<?> newPluginClass, Class<?> newFormatClass, Class<?> newFormatProviderClass) {
+ return builder()
+ .setPluginClass(newPluginClass)
+ .setFormatClass(newFormatClass)
+ .setFormatProviderClass(newFormatProviderClass)
+ .setPluginType(Plugin.initPluginType(newPluginClass))
+ .setContext(Plugin.initContext(newPluginClass))
+ .build();
+ }
+
+ /** Creates a plugin builder instance. */
+ public static Builder builder() {
+ return new AutoValue_Plugin.Builder();
+ }
+
+ /** Builder class for a {@link Plugin}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setPluginClass(Class<?> newPluginClass);
+
+ public abstract Builder setFormatClass(Class<?> newFormatClass);
+
+ public abstract Builder setFormatProviderClass(Class<?> newFormatProviderClass);
+
+ public abstract Builder setPluginType(PluginConstants.PluginType newPluginType);
+
+ public abstract Builder setContext(BatchContextImpl context);
+
+ public abstract Plugin build();
+ }
+}
diff --git a/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java
new file mode 100644
index 00000000000..01938ae5c99
--- /dev/null
+++ b/sdks/java/io/cdap/src/main/java/org/apache/beam/sdk/io/cdap/PluginConstants.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import io.cdap.cdap.api.data.batch.InputFormatProvider;
+import io.cdap.cdap.api.data.batch.OutputFormatProvider;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+/** Class for CDAP plugin constants. */
+public final class PluginConstants {
+ /** Plugin types. */
+ public enum PluginType {
+ SOURCE,
+ SINK
+ }
+
+ /** Format types. */
+ public enum Format {
+ INPUT(InputFormat.class, "InputFormat"),
+ OUTPUT(OutputFormat.class, "OutputFormat");
+
+ private final Class<?> formatClass;
+ private final String formatName;
+
+ Format(Class<?> formatClass, String formatName) {
+ this.formatClass = formatClass;
+ this.formatName = formatName;
+ }
+
+ public Class<?> getFormatClass() {
+ return formatClass;
+ }
+
+ public String getFormatName() {
+ return formatName;
+ }
+ }
+
+ /** Format provider types. */
+ public enum FormatProvider {
+ INPUT(InputFormatProvider.class, "InputFormatProvider"),
+ OUTPUT(OutputFormatProvider.class, "OutputFormatProvider");
+
+ private final Class<?> formatProviderClass;
+ private final String formatProviderName;
+
+ FormatProvider(Class<?> formatProviderClass, String formatProviderName) {
+ this.formatProviderClass = formatProviderClass;
+ this.formatProviderName = formatProviderName;
+ }
+
+ public Class<?> getFormatProviderClass() {
+ return formatProviderClass;
+ }
+
+ public String getFormatProviderName() {
+ return formatProviderName;
+ }
+ }
+
+ /** Hadoop types. */
+ public enum Hadoop {
+ SOURCE("mapreduce.job.inputformat.class", "key.class", "value.class"),
+ SINK(
+ "mapreduce.job.outputformat.class",
+ "mapreduce.job.output.key.class",
+ "mapreduce.job.output.value.class");
+
+ private final String formatClass;
+ private final String keyClass;
+ private final String valueClass;
+
+ Hadoop(String formatClassName, String keyClass, String valueClass) {
+ this.formatClass = formatClassName;
+ this.keyClass = keyClass;
+ this.valueClass = valueClass;
+ }
+
+ public String getFormatClass() {
+ return formatClass;
+ }
+
+ public String getKeyClass() {
+ return keyClass;
+ }
+
+ public String getValueClass() {
+ return valueClass;
+ }
+ }
+}
diff --git a/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java
new file mode 100644
index 00000000000..501c91b6cda
--- /dev/null
+++ b/sdks/java/io/cdap/src/test/java/org/apache/beam/sdk/io/cdap/PluginTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.cdap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.servicenow.source.ServiceNowInputFormat;
+import io.cdap.plugin.servicenow.source.ServiceNowSource;
+import io.cdap.plugin.servicenow.source.ServiceNowSourceConfig;
+import io.cdap.plugin.servicenow.source.util.ServiceNowConstants;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.io.MapWritable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class PluginTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(PluginTest.class);
+
+ private static final ImmutableMap<String, Object> TEST_SERVICE_NOW_PARAMS_MAP =
+ ImmutableMap.<String, java.lang.Object>builder()
+ .put(ServiceNowConstants.PROPERTY_CLIENT_ID, "clientId")
+ .put(ServiceNowConstants.PROPERTY_CLIENT_SECRET, "clientSecret")
+ .put(ServiceNowConstants.PROPERTY_API_ENDPOINT, "https://www.google.com")
+ .put(ServiceNowConstants.PROPERTY_QUERY_MODE, "Table")
+ .put(ServiceNowConstants.PROPERTY_USER, "user")
+ .put(ServiceNowConstants.PROPERTY_PASSWORD, "password")
+ .put(ServiceNowConstants.PROPERTY_TABLE_NAME, "tableName")
+ .put(ServiceNowConstants.PROPERTY_VALUE_TYPE, "Actual")
+ .put("referenceName", "oldReference")
+ .build();
+
+ private static final String REFERENCE_NAME_PARAM_NAME = "referenceName";
+
+ /** Config for ServiceNow Batch Source plugin. */
+ public ServiceNowSourceConfig serviceNowSourceConfig =
+ new ConfigWrapper<>(ServiceNowSourceConfig.class)
+ .withParams(TEST_SERVICE_NOW_PARAMS_MAP)
+ .setParam(REFERENCE_NAME_PARAM_NAME, "some reference name")
+ .build();
+
+ @Test
+ public void testBuildingSourcePluginWithCDAPClasses() {
+ try {
+ Plugin serviceNowSourcePlugin =
+ Plugin.create(
+ ServiceNowSource.class,
+ ServiceNowInputFormat.class,
+ SourceInputFormatProvider.class)
+ .withConfig(serviceNowSourceConfig)
+ .withHadoopConfiguration(Schema.class, MapWritable.class);
+
+ assertEquals(ServiceNowSource.class, serviceNowSourcePlugin.getPluginClass());
+ assertEquals(ServiceNowInputFormat.class, serviceNowSourcePlugin.getFormatClass());
+ assertEquals(
+ SourceInputFormatProvider.class, serviceNowSourcePlugin.getFormatProviderClass());
+ assertEquals(serviceNowSourceConfig, serviceNowSourcePlugin.getPluginConfig());
+ assertEquals(
+ ServiceNowInputFormat.class,
+ serviceNowSourcePlugin
+ .getHadoopConfiguration()
+ .getClass(
+ PluginConstants.Hadoop.SOURCE.getFormatClass(),
+ PluginConstants.Format.INPUT.getFormatClass()));
+ } catch (Exception e) {
+ LOG.error("Error occurred while building the ServiceNow Source Plugin", e);
+ fail();
+ }
+ }
+
+ @Test
+ public void testSettingPluginType() {
+ Plugin serviceNowSourcePlugin =
+ Plugin.create(
+ ServiceNowSource.class,
+ ServiceNowInputFormat.class,
+ SourceInputFormatProvider.class)
+ .withConfig(serviceNowSourceConfig)
+ .withHadoopConfiguration(Schema.class, MapWritable.class);
+
+ assertEquals(PluginConstants.PluginType.SOURCE, serviceNowSourcePlugin.getPluginType());
+ }
+
+ @Test
+ @SuppressWarnings("UnusedVariable")
+ public void testSettingPluginTypeFailed() {
+ try {
+ Plugin serviceNowSourcePlugin =
+ Plugin.create(Object.class, Object.class, Object.class)
+ .withConfig(serviceNowSourceConfig)
+ .withHadoopConfiguration(Schema.class, MapWritable.class);
+ fail("This should have thrown an exception");
+ } catch (Exception e) {
+ assertEquals("Provided class should be source or sink plugin", e.getMessage());
+ }
+ }
+}