You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/11/23 00:43:40 UTC

[gobblin] branch master updated: [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (#3425)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cd19b9  [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (#3425)
6cd19b9 is described below

commit 6cd19b9e495d6cb39ebc886e8323476ff474838c
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Mon Nov 22 16:43:34 2021 -0800

    [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline (#3425)
    
    * [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container
    
    * address comments
    
    * address comments
    
    * [GOBBLIN-1573]Fix the ClassNotFoundException in streaming test pipeline
---
 .../string/KafkaRecordToStringConverter.java       | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/string/KafkaRecordToStringConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/string/KafkaRecordToStringConverter.java
new file mode 100644
index 0000000..6e30af6
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/string/KafkaRecordToStringConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.string;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.Converter;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+
+
+/**
+ * Implementation of {@link Converter} that converts a given {@link Object} to its {@link String} representation
+ */
+public class KafkaRecordToStringConverter extends Converter<Object, String, DecodeableKafkaRecord, String> {
+
+  @Override
+  public String convertSchema(Object inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+    return inputSchema.toString();
+  }
+
+  @Override
+  public Iterable<String> convertRecord(String outputSchema, DecodeableKafkaRecord inputRecord, WorkUnitState workUnit)
+      throws DataConversionException {
+    return new SingleRecordIterable<>(inputRecord.getValue().toString());
+  }
+}