You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@gobblin.apache.org by "Kidwell, Jack" <Ja...@comcast.com> on 2018/04/11 17:47:52 UTC

KafkaSimpleJsonExtractor

Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?


RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
Thank you for spending your time on this.

See https://issues.apache.org/jira/browse/GOBBLIN-481.

Jack Kidwell

From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Saturday, April 28, 2018 12:07 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

Great, this looks fine to me. Can you raise a JIRA with the issue description and there after raise a PR for this change.
Thanks,
Vicky

On Sat, Apr 28, 2018 at 3:22 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
I wiped my windows directory, but because I was using tags/gobblin_0.10.0 it’s pointless to troubleshoot it.

On my fedora workstation, I’m using tags/release-0.12.0-rc2, and patched
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java.

The tests pass. How do I get this trivial change in? Do I make branch and a pull request?

Here’s the output:

./gradlew :gobblin-modules:gobblin-kafka-common:test
Parallel execution with configuration on demand is an incubating feature.
:buildSrc:compileJava UP-TO-DATE
:buildSrc:compileGroovy UP-TO-DATE
:buildSrc:processResources UP-TO-DATE
:buildSrc:classes UP-TO-DATE
:buildSrc:jar UP-TO-DATE
:buildSrc:assemble UP-TO-DATE
:buildSrc:compileTestJava UP-TO-DATE
:buildSrc:compileTestGroovy UP-TO-DATE
:buildSrc:processTestResources UP-TO-DATE
:buildSrc:testClasses UP-TO-DATE
:buildSrc:test UP-TO-DATE
:buildSrc:check UP-TO-DATE
:buildSrc:build UP-TO-DATE
Build property: gobblinFlavor=standard
Build property: jdkVersion=1.8
Build property: sonatypeArtifactRepository=https://oss.sonatype.org/service/local/staging/deploy/maven2/
Build property: sonatypeArtifactSnapshotRepository=https://oss.sonatype.org/content/repositories/snapshots/
Build property: nexusArtifactRepository=https://repository.apache.org/service/local/staging/deploy/maven2
Build property: nexusArtifactSnapshotRepository=https://repository.apache.org/content/repositories/snapshots
Build property: doNotSignArtifacts=false
Build property: avroVersion=1.8.1
Build property: awsVersion=1.11.8
Build property: bytemanVersion=2.2.1
Build property: confluentVersion=2.0.1
Build property: hadoopVersion=2.3.0
Build property: hiveVersion=1.0.1
Build property: kafka08Version=0.8.2.2
Build property: kafka09Version=0.9.0.1
Build property: pegasusVersion=11.0.0
Build property: salesforceVersion=37.0.3
Detected Gradle version major=2 minor=13
Build property: publishToMaven=false
Build property: publishToNexus=false
Release Version: 0.12.0
:gobblin-api:compileJava
:gobblin-utility:processResources
:gobblin-metrics-libs:gobblin-metrics-base:generateAvro UP-TO-DATE
:gobblin-metrics-libs:gobblin-metrics-base:processResources
:gobblin-modules:gobblin-metrics-graphite:processResources UP-TO-DATE
:gobblin-modules:gobblin-metrics-hadoop:processResources UP-TO-DATE
:gobblin-modules:gobblin-metrics-influxdb:processResources UP-TO-DATE
:gobblin-metrics-libs:gobblin-metrics:processResources UP-TO-DATE
:gobblin-modules:gobblin-codecs:processResources UP-TO-DATE
:gobblin-core-base:processResources UP-TO-DATE
:gobblin-test-utils:processResources
:gobblin-config-management:gobblin-config-core:processResources
:gobblin-config-management:gobblin-config-client:processResources UP-TO-DATE
:gobblin-modules:gobblin-kafka-common:processResources UP-TO-DATE
:gobblin-modules:gobblin-kafka-common:processTestResources
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/runtime/BasicTestControlMessage.java:31: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java:32: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java:31: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
3 warnings
:gobblin-api:processResources UP-TO-DATE
:gobblin-api:classes
:gobblin-api:jar
:gobblin-utility:compileJava
:gobblin-modules:gobblin-codecs:compileJava
:gobblin-modules:gobblin-codecs:classes
:gobblin-modules:gobblin-codecs:jar
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:gobblin-utility:classes
:gobblin-utility:jar
:gobblin-metrics-libs:gobblin-metrics-base:compileJava
:gobblin-test-utils:compileJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

:gobblin-test-utils:classes
:gobblin-test-utils:jar
:gobblin-config-management:gobblin-config-core:compileJava
:gobblin-config-management:gobblin-config-core:classes
:gobblin-config-management:gobblin-config-core:jar
:gobblin-config-management:gobblin-config-client:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-metrics-libs:gobblin-metrics-base:classes
:gobblin-metrics-libs:gobblin-metrics-base:jar
:gobblin-modules:gobblin-metrics-graphite:compileJava
:gobblin-modules:gobblin-metrics-hadoop:compileJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-metrics-graphite/src/main/java/org/apache/gobblin/metrics/graphite/GraphiteEventReporter.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

:gobblin-modules:gobblin-metrics-graphite:classes
:gobblin-modules:gobblin-metrics-graphite:jar
:gobblin-modules:gobblin-metrics-influxdb:compileJava
:gobblin-modules:gobblin-metrics-hadoop:classes
:gobblin-modules:gobblin-metrics-hadoop:jar
:gobblin-config-management:gobblin-config-client:classes
:gobblin-config-management:gobblin-config-client:jar
Note: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-metrics-influxdb/src/main/java/org/apache/gobblin/metrics/influxdb/InfluxDBEventReporter.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
:gobblin-modules:gobblin-metrics-influxdb:classes
:gobblin-modules:gobblin-metrics-influxdb:jar
:gobblin-metrics-libs:gobblin-metrics:compileJava
:gobblin-metrics-libs:gobblin-metrics:classes
:gobblin-metrics-libs:gobblin-metrics:jar
:gobblin-core-base:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-core-base:classes
:gobblin-core-base:jar
:gobblin-modules:gobblin-kafka-common:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-modules:gobblin-kafka-common:classes
:gobblin-modules:gobblin-kafka-common:compileTestJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-modules:gobblin-kafka-common:testClasses
:gobblin-modules:gobblin-kafka-common:test

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testIdSchemaCaching STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testIdSchemaCaching PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testMaxReferences STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testMaxReferences PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadConverterTest.testConverter STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadConverterTest.testConverter PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadExtractingConverterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadExtractingConverterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.writer.KafkaWriterHelperTest.testSharedConfig STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.writer.KafkaWriterHelperTest.testSharedConfig PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testInvalidString STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testInvalidString PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testValidString STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testValidString PASSED

BUILD SUCCESSFUL

Total time: 37.075 secs


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Wednesday, April 25, 2018 11:56 PM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

It is nice to hear that you got it working.
Could you please post the details of the error that you are seeing in the windows build may be someone can fix it? Also with windows you could try building using cygwin, not sure how you were doing it. I am personally using Ubuntu for development.
We may require to get this change commited, can you run the tests too and see if these changes pass all the tests too.
Lastly move to the latest Gobblin distribution, you may have to rework as the package names have changed after it moved to Apache incubation.

Thanks,
Vicky


On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
I set up a fedora workstation where gobblin built the first time. It was failing under windows.

I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution, installed it and ran with it. And it works using these job configs:

source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor


Can this patch be applied to the repository:

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
index 9001df134..d0813f9a3 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
@@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;

import com.google.gson.Gson;

+import gobblin.annotation.Alias;
import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import gobblin.source.extractor.Extractor;

+@Alias("KafkaSimpleJsonExtractor")
public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor implements Extractor<String, byte[]> {

     private static final Gson gson = new Gson();

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 19, 2018 11:31 PM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Which script are you using to make the build, are you not using these ones


  1.  Skip tests and build the distribution: Run ./gradlew build -x findbugsMain -x test -x rat -x checkstyleMain The distribution will be created in build/gobblin-distribution/distributions directory. (or)
  2.  Run tests and build the distribution (requires Maven): Run ./gradlew build The distribution will be created in build/gobblin-distribution/distributions directory.
These are taken from  here https://github.com/apache/incubator-gobblin/blob/master/README.md

I did not got a chance to add the Alias annotation, I am bit busy this weekend. I will try to take out some time and see if I can test it.




On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
So far I have failed to build gobblin. I’m currently stuck on this error:

FAILURE: Build failed with an exception.

* Where:
Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32

* What went wrong:
A problem occurred evaluating script.
> Failed to apply plugin [id 'org.gradle.java']
   > Value is null


Did you have a chance to research adding annotation, @Alias (“KafkaSimpleJsonExtractor”)?


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Saturday, April 14, 2018 12:19 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend KafkaSimpleJsonExtractor and have the Alias annotation too?

The other option would be to add the Alias annotation to the KafkaSimpleJsonExtractor, I need to see what implications it will have and why this was not done earlier. I require some time to look deeper into it, hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Our correspondence helps with understanding Gobblin. ☺

We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message.

There are three classes that extend abstract class, KafkaSource:
KafkaDeserializerSource,
KafkaSimpleSource and
UniversalKafkaSource.

Each has method, getExtractor(), that returns these respective instances:

KafkaDeserializerExtractor;
KafkaSimpleExtractor and
a class found by ClassAliasResolver. resolveClass.

The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work.

The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek.

Or did we miss something?

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Friday, April 13, 2018 12:01 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file system after it is created).


It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning.

So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format).





On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?









Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
Great, this looks fine to me. Can you raise a JIRA with the issue
description and there after raise a PR for this change.
Thanks,
Vicky

On Sat, Apr 28, 2018 at 3:22 AM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> I wiped my windows directory, but because I was using tags/gobblin_0.10.0
> it’s pointless to troubleshoot it.
>
>
>
> On my fedora workstation, I’m using tags/release-0.12.0-rc2, and patched
>
> gobblin-modules/gobblin-kafka-common/src/main/java/org/
> apache/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java.
>
>
>
>
> The tests pass. How do I get this trivial change in? Do I make branch and
> a pull request?
>
>
>
> Here’s the output:
>
>
>
> ./gradlew :gobblin-modules:gobblin-kafka-common:test
>
> Parallel execution with configuration on demand is an incubating feature.
>
> :buildSrc:compileJava UP-TO-DATE
>
> :buildSrc:compileGroovy UP-TO-DATE
>
> :buildSrc:processResources UP-TO-DATE
>
> :buildSrc:classes UP-TO-DATE
>
> :buildSrc:jar UP-TO-DATE
>
> :buildSrc:assemble UP-TO-DATE
>
> :buildSrc:compileTestJava UP-TO-DATE
>
> :buildSrc:compileTestGroovy UP-TO-DATE
>
> :buildSrc:processTestResources UP-TO-DATE
>
> :buildSrc:testClasses UP-TO-DATE
>
> :buildSrc:test UP-TO-DATE
>
> :buildSrc:check UP-TO-DATE
>
> :buildSrc:build UP-TO-DATE
>
> Build property: gobblinFlavor=standard
>
> Build property: jdkVersion=1.8
>
> Build property: sonatypeArtifactRepository=htt
> ps://oss.sonatype.org/service/local/staging/deploy/maven2/
>
> Build property: sonatypeArtifactSnapshotRepository=
> https://oss.sonatype.org/content/repositories/snapshots/
>
> Build property: nexusArtifactRepository=https://repository.apache.org/
> service/local/staging/deploy/maven2
>
> Build property: nexusArtifactSnapshotRepository=https://repository.apache.
> org/content/repositories/snapshots
>
> Build property: doNotSignArtifacts=false
>
> Build property: avroVersion=1.8.1
>
> Build property: awsVersion=1.11.8
>
> Build property: bytemanVersion=2.2.1
>
> Build property: confluentVersion=2.0.1
>
> Build property: hadoopVersion=2.3.0
>
> Build property: hiveVersion=1.0.1
>
> Build property: kafka08Version=0.8.2.2
>
> Build property: kafka09Version=0.9.0.1
>
> Build property: pegasusVersion=11.0.0
>
> Build property: salesforceVersion=37.0.3
>
> Detected Gradle version major=2 minor=13
>
> Build property: publishToMaven=false
>
> Build property: publishToNexus=false
>
> Release Version: 0.12.0
>
> :gobblin-api:compileJava
>
> :gobblin-utility:processResources
>
> :gobblin-metrics-libs:gobblin-metrics-base:generateAvro UP-TO-DATE
>
> :gobblin-metrics-libs:gobblin-metrics-base:processResources
>
> :gobblin-modules:gobblin-metrics-graphite:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-metrics-hadoop:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-metrics-influxdb:processResources UP-TO-DATE
>
> :gobblin-metrics-libs:gobblin-metrics:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-codecs:processResources UP-TO-DATE
>
> :gobblin-core-base:processResources UP-TO-DATE
>
> :gobblin-test-utils:processResources
>
> :gobblin-config-management:gobblin-config-core:processResources
>
> :gobblin-config-management:gobblin-config-client:processResources
> UP-TO-DATE
>
> :gobblin-modules:gobblin-kafka-common:processResources UP-TO-DATE
>
> :gobblin-modules:gobblin-kafka-common:processTestResources
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/runtime/BasicTestControlMessage.java:31: warning:
> Generating equals/hashCode implementation but without a call to superclass,
> even though this class does not extend java.lang.Object. If this is
> intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
>
> @EqualsAndHashCode
>
> ^
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java:32:
> warning: Generating equals/hashCode implementation but without a call to
> superclass, even though this class does not extend java.lang.Object. If
> this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your
> type.
>
> @EqualsAndHashCode
>
> ^
>
> /home/spruitt/git/incubator-gobblin/gobblin-api/src/main/
> java/org/apache/gobblin/stream/FlushControlMessage.java:31: warning:
> Generating equals/hashCode implementation but without a call to superclass,
> even though this class does not extend java.lang.Object. If this is
> intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
>
> @EqualsAndHashCode
>
> ^
>
> Note: Some input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
> 3 warnings
>
> :gobblin-api:processResources UP-TO-DATE
>
> :gobblin-api:classes
>
> :gobblin-api:jar
>
> :gobblin-utility:compileJava
>
> :gobblin-modules:gobblin-codecs:compileJava
>
> :gobblin-modules:gobblin-codecs:classes
>
> :gobblin-modules:gobblin-codecs:jar
>
> Note: Some input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
> :gobblin-utility:classes
>
> :gobblin-utility:jar
>
> :gobblin-metrics-libs:gobblin-metrics-base:compileJava
>
> :gobblin-test-utils:compileJavaNote: /home/spruitt/git/incubator-
> gobblin/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java
> uses or overrides a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
>
>
> :gobblin-test-utils:classes
>
> :gobblin-test-utils:jar
>
> :gobblin-config-management:gobblin-config-core:compileJava
>
> :gobblin-config-management:gobblin-config-core:classes
>
> :gobblin-config-management:gobblin-config-core:jar
>
> :gobblin-config-management:gobblin-config-client:compileJavaNote: Some
> input files use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-metrics-libs:gobblin-metrics-base:classes
>
> :gobblin-metrics-libs:gobblin-metrics-base:jar
>
> :gobblin-modules:gobblin-metrics-graphite:compileJava
>
> :gobblin-modules:gobblin-metrics-hadoop:compileJavaNote:
> /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-metrics-graphite/src/main/java/org/apache/gobblin/
> metrics/graphite/GraphiteEventReporter.java uses or overrides a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
>
>
> :gobblin-modules:gobblin-metrics-graphite:classes
>
> :gobblin-modules:gobblin-metrics-graphite:jar
>
> :gobblin-modules:gobblin-metrics-influxdb:compileJava
>
> :gobblin-modules:gobblin-metrics-hadoop:classes
>
> :gobblin-modules:gobblin-metrics-hadoop:jar
>
> :gobblin-config-management:gobblin-config-client:classes
>
> :gobblin-config-management:gobblin-config-client:jar
>
> Note: /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-metrics-influxdb/src/main/java/org/apache/gobblin/
> metrics/influxdb/InfluxDBEventReporter.java uses or overrides a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> :gobblin-modules:gobblin-metrics-influxdb:classes
>
> :gobblin-modules:gobblin-metrics-influxdb:jar
>
> :gobblin-metrics-libs:gobblin-metrics:compileJava
>
> :gobblin-metrics-libs:gobblin-metrics:classes
>
> :gobblin-metrics-libs:gobblin-metrics:jar
>
> :gobblin-core-base:compileJavaNote: Some input files use or override a
> deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-core-base:classes
>
> :gobblin-core-base:jar
>
> :gobblin-modules:gobblin-kafka-common:compileJavaNote: Some input files
> use or override a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-modules:gobblin-kafka-common:classes
>
> :gobblin-modules:gobblin-kafka-common:compileTestJavaNote:
> /home/spruitt/git/incubator-gobblin/gobblin-modules/
> gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/
> KafkaAvroEventReporterTest.java uses or overrides a deprecated API.
>
> Note: Recompile with -Xlint:deprecation for details.
>
> Note: Some input files use unchecked or unsafe operations.
>
> Note: Recompile with -Xlint:unchecked for details.
>
>
>
> :gobblin-modules:gobblin-kafka-common:testClasses
>
> :gobblin-modules:gobblin-kafka-common:test
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testIdSchemaCaching STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testIdSchemaCaching PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testMaxReferences STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testMaxReferences PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.
> CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadConverterTest.testConverter STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadConverterTest.testConverter PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadExtractingConverterTest. STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.
> EnvelopePayloadExtractingConverterTest. PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> writer.KafkaWriterHelperTest.testSharedConfig STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> writer.KafkaWriterHelperTest.testSharedConfig PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest.
> STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest.
> PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testInvalidString STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testInvalidString PASSED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testValidString STARTED
>
>
>
> Gradle suite > Gradle test > org.apache.gobblin.kafka.
> serialize.MD5DigestTest.testValidString PASSED
>
>
>
> BUILD SUCCESSFUL
>
>
>
> Total time: 37.075 secs
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Wednesday, April 25, 2018 11:56 PM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> It is nice to hear that you got it working.
>
> Could you please post the details of the error that you are seeing in the
> windows build may be someone can fix it? Also with windows you could try
> building using cygwin, not sure how you were doing it. I am personally
> using Ubuntu for development.
>
> We may require to get this change commited, can you run the tests too and
> see if these changes pass all the tests too.
>
> Lastly move to the latest Gobblin distribution, you may have to rework as
> the package names have changed after it moved to Apache incubation.
>
>
>
> Thanks,
>
> Vicky
>
>
>
>
>
> On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> I set up a fedora workstation where gobblin built the first time. It was
> failing under windows.
>
>
>
> I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution,
> installed it and ran with it. And it works using these job configs:
>
>
>
> source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
>
> gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor
>
>
>
>
>
> Can this patch be applied to the repository:
>
>
>
> diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
> b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> index 9001df134..d0813f9a3 100644
>
> --- a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;
>
>
>
> import com.google.gson.Gson;
>
>
>
> +import gobblin.annotation.Alias;
>
> import gobblin.configuration.WorkUnitState;
>
> import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
>
> import gobblin.source.extractor.Extractor;
>
>
>
> +@Alias("KafkaSimpleJsonExtractor")
>
> public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor
> implements Extractor<String, byte[]> {
>
>
>
>      private static final Gson gson = new Gson();
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 19, 2018 11:31 PM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Which script are you using to make the build, are you not using these ones
>
>
>
>    1. Skip tests and build the distribution: Run ./gradlew build -x
>    findbugsMain -x test -x rat -x checkstyleMain The distribution will be
>    created in build/gobblin-distribution/distributions directory. (or)
>    2. Run tests and build the distribution (requires Maven): Run ./gradlew
>    build The distribution will be created in build/gobblin-distribution/distributions
>    directory.
>
> These are taken from  here https://github.com/
> apache/incubator-gobblin/blob/master/README.md
>
>
>
> I did not got a chance to add the Alias annotation, I am bit busy this
> weekend. I will try to take out some time and see if I can test it.
>
>
>
>
>
>
>
>
>
> On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> So far I have failed to build gobblin. I’m currently stuck on this error:
>
>
>
> FAILURE: Build failed with an exception.
>
>
>
> * Where:
>
> Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32
>
>
>
> * What went wrong:
>
> A problem occurred evaluating script.
>
> > Failed to apply plugin [id 'org.gradle.java']
>
>    > Value is null
>
>
>
>
>
> Did you have a chance to research adding annotation, @Alias
> (“KafkaSimpleJsonExtractor”)?
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Saturday, April 14, 2018 12:19 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Yes you seem to correct, the code never lies :)
>
>
>
> Can you try implementing your customised Extractor which will extend
> KafkaSimpleJsonExtractor and have the Alias annotation too?
>
>
>
> The other option would be to add the Alias annotation to the
> KafkaSimpleJsonExtractor, I need to see what implications it will have and
> why this was not done earlier. I require some time to look deeper into it,
> hope will take some time during the weekend.
>
>
>
>
>
>
>
>
>
> On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
I wiped my windows directory, but because I was using tags/gobblin_0.10.0 it’s pointless to troubleshoot it.

On my fedora workstation, I’m using tags/release-0.12.0-rc2, and patched
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java.

The tests pass. How do I get this trivial change in? Do I make branch and a pull request?

Here’s the output:

./gradlew :gobblin-modules:gobblin-kafka-common:test
Parallel execution with configuration on demand is an incubating feature.
:buildSrc:compileJava UP-TO-DATE
:buildSrc:compileGroovy UP-TO-DATE
:buildSrc:processResources UP-TO-DATE
:buildSrc:classes UP-TO-DATE
:buildSrc:jar UP-TO-DATE
:buildSrc:assemble UP-TO-DATE
:buildSrc:compileTestJava UP-TO-DATE
:buildSrc:compileTestGroovy UP-TO-DATE
:buildSrc:processTestResources UP-TO-DATE
:buildSrc:testClasses UP-TO-DATE
:buildSrc:test UP-TO-DATE
:buildSrc:check UP-TO-DATE
:buildSrc:build UP-TO-DATE
Build property: gobblinFlavor=standard
Build property: jdkVersion=1.8
Build property: sonatypeArtifactRepository=https://oss.sonatype.org/service/local/staging/deploy/maven2/
Build property: sonatypeArtifactSnapshotRepository=https://oss.sonatype.org/content/repositories/snapshots/
Build property: nexusArtifactRepository=https://repository.apache.org/service/local/staging/deploy/maven2
Build property: nexusArtifactSnapshotRepository=https://repository.apache.org/content/repositories/snapshots
Build property: doNotSignArtifacts=false
Build property: avroVersion=1.8.1
Build property: awsVersion=1.11.8
Build property: bytemanVersion=2.2.1
Build property: confluentVersion=2.0.1
Build property: hadoopVersion=2.3.0
Build property: hiveVersion=1.0.1
Build property: kafka08Version=0.8.2.2
Build property: kafka09Version=0.9.0.1
Build property: pegasusVersion=11.0.0
Build property: salesforceVersion=37.0.3
Detected Gradle version major=2 minor=13
Build property: publishToMaven=false
Build property: publishToNexus=false
Release Version: 0.12.0
:gobblin-api:compileJava
:gobblin-utility:processResources
:gobblin-metrics-libs:gobblin-metrics-base:generateAvro UP-TO-DATE
:gobblin-metrics-libs:gobblin-metrics-base:processResources
:gobblin-modules:gobblin-metrics-graphite:processResources UP-TO-DATE
:gobblin-modules:gobblin-metrics-hadoop:processResources UP-TO-DATE
:gobblin-modules:gobblin-metrics-influxdb:processResources UP-TO-DATE
:gobblin-metrics-libs:gobblin-metrics:processResources UP-TO-DATE
:gobblin-modules:gobblin-codecs:processResources UP-TO-DATE
:gobblin-core-base:processResources UP-TO-DATE
:gobblin-test-utils:processResources
:gobblin-config-management:gobblin-config-core:processResources
:gobblin-config-management:gobblin-config-client:processResources UP-TO-DATE
:gobblin-modules:gobblin-kafka-common:processResources UP-TO-DATE
:gobblin-modules:gobblin-kafka-common:processTestResources
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/runtime/BasicTestControlMessage.java:31: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/stream/MetadataUpdateControlMessage.java:32: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
/home/spruitt/git/incubator-gobblin/gobblin-api/src/main/java/org/apache/gobblin/stream/FlushControlMessage.java:31: warning: Generating equals/hashCode implementation but without a call to superclass, even though this class does not extend java.lang.Object. If this is intentional, add '@EqualsAndHashCode(callSuper=false)' to your type.
@EqualsAndHashCode
^
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
3 warnings
:gobblin-api:processResources UP-TO-DATE
:gobblin-api:classes
:gobblin-api:jar
:gobblin-utility:compileJava
:gobblin-modules:gobblin-codecs:compileJava
:gobblin-modules:gobblin-codecs:classes
:gobblin-modules:gobblin-codecs:jar
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
:gobblin-utility:classes
:gobblin-utility:jar
:gobblin-metrics-libs:gobblin-metrics-base:compileJava
:gobblin-test-utils:compileJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-test-utils/src/main/java/org/apache/gobblin/test/TestUtils.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

:gobblin-test-utils:classes
:gobblin-test-utils:jar
:gobblin-config-management:gobblin-config-core:compileJava
:gobblin-config-management:gobblin-config-core:classes
:gobblin-config-management:gobblin-config-core:jar
:gobblin-config-management:gobblin-config-client:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-metrics-libs:gobblin-metrics-base:classes
:gobblin-metrics-libs:gobblin-metrics-base:jar
:gobblin-modules:gobblin-metrics-graphite:compileJava
:gobblin-modules:gobblin-metrics-hadoop:compileJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-metrics-graphite/src/main/java/org/apache/gobblin/metrics/graphite/GraphiteEventReporter.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

:gobblin-modules:gobblin-metrics-graphite:classes
:gobblin-modules:gobblin-metrics-graphite:jar
:gobblin-modules:gobblin-metrics-influxdb:compileJava
:gobblin-modules:gobblin-metrics-hadoop:classes
:gobblin-modules:gobblin-metrics-hadoop:jar
:gobblin-config-management:gobblin-config-client:classes
:gobblin-config-management:gobblin-config-client:jar
Note: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-metrics-influxdb/src/main/java/org/apache/gobblin/metrics/influxdb/InfluxDBEventReporter.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
:gobblin-modules:gobblin-metrics-influxdb:classes
:gobblin-modules:gobblin-metrics-influxdb:jar
:gobblin-metrics-libs:gobblin-metrics:compileJava
:gobblin-metrics-libs:gobblin-metrics:classes
:gobblin-metrics-libs:gobblin-metrics:jar
:gobblin-core-base:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-core-base:classes
:gobblin-core-base:jar
:gobblin-modules:gobblin-kafka-common:compileJavaNote: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-modules:gobblin-kafka-common:classes
:gobblin-modules:gobblin-kafka-common:compileTestJavaNote: /home/spruitt/git/incubator-gobblin/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/reporter/KafkaAvroEventReporterTest.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
Note: Some input files use unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.

:gobblin-modules:gobblin-kafka-common:testClasses
:gobblin-modules:gobblin-kafka-common:test

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testIdSchemaCaching STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testIdSchemaCaching PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testMaxReferences STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testMaxReferences PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterSchemaCaching PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.schemareg.CachingKafkaSchemaRegistryTest.testRegisterShouldCacheIds PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadConverterTest.testConverter STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadConverterTest.testConverter PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadExtractingConverterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopePayloadExtractingConverterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.converter.EnvelopeSchemaConverterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaEventReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaAvroReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.reporter.KafkaReporterTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.writer.KafkaWriterHelperTest.testSharedConfig STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.writer.KafkaWriterHelperTest.testSharedConfig PASSED

Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest. STARTED

Gradle suite > Gradle test > org.apache.gobblin.metrics.kafka.LoggingPusherTest. PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testInvalidString STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testInvalidString PASSED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testValidString STARTED

Gradle suite > Gradle test > org.apache.gobblin.kafka.serialize.MD5DigestTest.testValidString PASSED

BUILD SUCCESSFUL

Total time: 37.075 secs


From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Wednesday, April 25, 2018 11:56 PM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

It is nice to hear that you got it working.
Could you please post the details of the error that you are seeing in the windows build may be someone can fix it? Also with windows you could try building using cygwin, not sure how you were doing it. I am personally using Ubuntu for development.
We may require to get this change commited, can you run the tests too and see if these changes pass all the tests too.
Lastly move to the latest Gobblin distribution, you may have to rework as the package names have changed after it moved to Apache incubation.

Thanks,
Vicky


On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
I set up a fedora workstation where gobblin built the first time. It was failing under windows.

I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution, installed it and ran with it. And it works using these job configs:

source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor


Can this patch be applied to the repository:

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
index 9001df134..d0813f9a3 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
@@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;

import com.google.gson.Gson;

+import gobblin.annotation.Alias;
import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import gobblin.source.extractor.Extractor;

+@Alias("KafkaSimpleJsonExtractor")
public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor implements Extractor<String, byte[]> {

     private static final Gson gson = new Gson();

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 19, 2018 11:31 PM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Which script are you using to make the build, are you not using these ones


  1.  Skip tests and build the distribution: Run ./gradlew build -x findbugsMain -x test -x rat -x checkstyleMain The distribution will be created in build/gobblin-distribution/distributions directory. (or)
  2.  Run tests and build the distribution (requires Maven): Run ./gradlew build The distribution will be created in build/gobblin-distribution/distributions directory.
These are taken from  here https://github.com/apache/incubator-gobblin/blob/master/README.md

I did not got a chance to add the Alias annotation, I am bit busy this weekend. I will try to take out some time and see if I can test it.




On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
So far I have failed to build gobblin. I’m currently stuck on this error:

FAILURE: Build failed with an exception.

* Where:
Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32

* What went wrong:
A problem occurred evaluating script.
> Failed to apply plugin [id 'org.gradle.java']
   > Value is null


Did you have a chance to research adding annotation, @Alias (“KafkaSimpleJsonExtractor”)?


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Saturday, April 14, 2018 12:19 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend KafkaSimpleJsonExtractor and have the Alias annotation too?

The other option would be to add the Alias annotation to the KafkaSimpleJsonExtractor, I need to see what implications it will have and why this was not done earlier. I require some time to look deeper into it, hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Our correspondence helps with understanding Gobblin. ☺

We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message.

There are three classes that extend abstract class, KafkaSource:
KafkaDeserializerSource,
KafkaSimpleSource and
UniversalKafkaSource.

Each has method, getExtractor(), that returns these respective instances:

KafkaDeserializerExtractor;
KafkaSimpleExtractor and
a class found by ClassAliasResolver. resolveClass.

The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work.

The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek.

Or did we miss something?

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Friday, April 13, 2018 12:01 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file system after it is created).


It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning.

So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format).





On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?








Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
It is nice to hear that you got it working.
Could you please post the details of the error that you are seeing in the
windows build may be someone can fix it? Also with windows you could try
building using cygwin, not sure how you were doing it. I am personally
using Ubuntu for development.
We may require to get this change commited, can you run the tests too and
see if these changes pass all the tests too.
Lastly move to the latest Gobblin distribution, you may have to rework as
the package names have changed after it moved to Apache incubation.

Thanks,
Vicky


On Thu, Apr 26, 2018 at 3:48 AM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> I set up a fedora workstation where gobblin built the first time. It was
> failing under windows.
>
>
>
> I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution,
> installed it and ran with it. And it works using these job configs:
>
>
>
> source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
>
> gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor
>
>
>
>
>
> Can this patch be applied to the repository:
>
>
>
> diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
> b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> index 9001df134..d0813f9a3 100644
>
> --- a/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/
> gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
>
> @@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;
>
>
>
> import com.google.gson.Gson;
>
>
>
> +import gobblin.annotation.Alias;
>
> import gobblin.configuration.WorkUnitState;
>
> import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
>
> import gobblin.source.extractor.Extractor;
>
>
>
> +@Alias("KafkaSimpleJsonExtractor")
>
> public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor
> implements Extractor<String, byte[]> {
>
>
>
>      private static final Gson gson = new Gson();
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 19, 2018 11:31 PM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Which script are you using to make the build, are you not using these ones
>
>
>
>    1. Skip tests and build the distribution: Run ./gradlew build -x
>    findbugsMain -x test -x rat -x checkstyleMain The distribution will be
>    created in build/gobblin-distribution/distributions directory. (or)
>    2. Run tests and build the distribution (requires Maven): Run ./gradlew
>    build The distribution will be created in build/gobblin-distribution/distributions
>    directory.
>
> These are taken from  here https://github.com/
> apache/incubator-gobblin/blob/master/README.md
>
>
>
> I did not got a chance to add the Alias annotation, I am bit busy this
> weekend. I will try to take out some time and see if I can test it.
>
>
>
>
>
>
>
>
>
> On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> So far I have failed to build gobblin. I’m currently stuck on this error:
>
>
>
> FAILURE: Build failed with an exception.
>
>
>
> * Where:
>
> Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32
>
>
>
> * What went wrong:
>
> A problem occurred evaluating script.
>
> > Failed to apply plugin [id 'org.gradle.java']
>
>    > Value is null
>
>
>
>
>
> Did you have a chance to research adding annotation, @Alias
> (“KafkaSimpleJsonExtractor”)?
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Saturday, April 14, 2018 12:19 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Yes you seem to correct, the code never lies :)
>
>
>
> Can you try implementing your customised Extractor which will extend
> KafkaSimpleJsonExtractor and have the Alias annotation too?
>
>
>
> The other option would be to add the Alias annotation to the
> KafkaSimpleJsonExtractor, I need to see what implications it will have and
> why this was not done earlier. I require some time to look deeper into it,
> hope will take some time during the weekend.
>
>
>
>
>
>
>
>
>
> On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
I set up a fedora workstation where gobblin built the first time. It was failing under windows.

I patched KafkaSimpleJsonExtractor.java, built the 0.10.0 distribution, installed it and ran with it. And it works using these job configs:

source.class=gobblin.source.extractor.extract.kafka.UniversalKafkaSource
gobblin.source.kafka.extractorType=KafkaSimpleJsonExtractor


Can this patch be applied to the repository:

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
index 9001df134..d0813f9a3 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/gobblin/source/extractor/extract/kafka/KafkaSimpleJsonExtractor.java
@@ -23,10 +23,12 @@ import java.nio.charset.StandardCharsets;

import com.google.gson.Gson;

+import gobblin.annotation.Alias;
import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import gobblin.source.extractor.Extractor;

+@Alias("KafkaSimpleJsonExtractor")
public class KafkaSimpleJsonExtractor extends KafkaSimpleExtractor implements Extractor<String, byte[]> {

     private static final Gson gson = new Gson();

From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Thursday, April 19, 2018 11:31 PM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

Which script are you using to make the build, are you not using these ones


  1.  Skip tests and build the distribution: Run ./gradlew build -x findbugsMain -x test -x rat -x checkstyleMain The distribution will be created in build/gobblin-distribution/distributions directory. (or)
  2.  Run tests and build the distribution (requires Maven): Run ./gradlew build The distribution will be created in build/gobblin-distribution/distributions directory.
These are taken from  here https://github.com/apache/incubator-gobblin/blob/master/README.md

I did not got a chance to add the Alias annotation, I am bit busy this weekend. I will try to take out some time and see if I can test it.




On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
So far I have failed to build gobblin. I’m currently stuck on this error:

FAILURE: Build failed with an exception.

* Where:
Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32

* What went wrong:
A problem occurred evaluating script.
> Failed to apply plugin [id 'org.gradle.java']
   > Value is null


Did you have a chance to research adding annotation, @Alias (“KafkaSimpleJsonExtractor”)?


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Saturday, April 14, 2018 12:19 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend KafkaSimpleJsonExtractor and have the Alias annotation too?

The other option would be to add the Alias annotation to the KafkaSimpleJsonExtractor, I need to see what implications it will have and why this was not done earlier. I require some time to look deeper into it, hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Our correspondence helps with understanding Gobblin. ☺

We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message.

There are three classes that extend abstract class, KafkaSource:
KafkaDeserializerSource,
KafkaSimpleSource and
UniversalKafkaSource.

Each has method, getExtractor(), that returns these respective instances:

KafkaDeserializerExtractor;
KafkaSimpleExtractor and
a class found by ClassAliasResolver. resolveClass.

The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work.

The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek.

Or did we miss something?

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Friday, April 13, 2018 12:01 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file system after it is created).


It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning.

So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format).





On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?







Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
Which script are you using to make the build, are you not using these ones


   1. Skip tests and build the distribution: Run ./gradlew build -x
   findbugsMain -x test -x rat -x checkstyleMain The distribution will be
   created in build/gobblin-distribution/distributions directory. (or)
   2. Run tests and build the distribution (requires Maven): Run ./gradlew
   build The distribution will be created in
   build/gobblin-distribution/distributions directory.

These are taken from  here
https://github.com/apache/incubator-gobblin/blob/master/README.md

I did not got a chance to add the Alias annotation, I am bit busy this
weekend. I will try to take out some time and see if I can test it.




On Fri, Apr 20, 2018 at 5:58 AM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> So far I have failed to build gobblin. I’m currently stuck on this error:
>
>
>
> FAILURE: Build failed with an exception.
>
>
>
> * Where:
>
> Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32
>
>
>
> * What went wrong:
>
> A problem occurred evaluating script.
>
> > Failed to apply plugin [id 'org.gradle.java']
>
>    > Value is null
>
>
>
>
>
> Did you have a chance to research adding annotation, @Alias (“
> KafkaSimpleJsonExtractor”)?
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Saturday, April 14, 2018 12:19 AM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Yes you seem to correct, the code never lies :)
>
>
>
> Can you try implementing your customised Extractor which will extend
> KafkaSimpleJsonExtractor and have the Alias annotation too?
>
>
>
> The other option would be to add the Alias annotation to the
> KafkaSimpleJsonExtractor, I need to see what implications it will have and
> why this was not done earlier. I require some time to look deeper into it,
> hope will take some time during the weekend.
>
>
>
>
>
>
>
>
>
> On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
So far I have failed to build gobblin. I’m currently stuck on this error:

FAILURE: Build failed with an exception.

* Where:
Script 'incubator-gobblin/gradle/scripts/idesSetup.gradle' line: 32

* What went wrong:
A problem occurred evaluating script.
> Failed to apply plugin [id 'org.gradle.java']
   > Value is null


Did you have a chance to research adding annotation, @Alias (“KafkaSimpleJsonExtractor”)?


From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Saturday, April 14, 2018 12:19 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend KafkaSimpleJsonExtractor and have the Alias annotation too?

The other option would be to add the Alias annotation to the KafkaSimpleJsonExtractor, I need to see what implications it will have and why this was not done earlier. I require some time to look deeper into it, hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Our correspondence helps with understanding Gobblin. ☺

We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message.

There are three classes that extend abstract class, KafkaSource:
KafkaDeserializerSource,
KafkaSimpleSource and
UniversalKafkaSource.

Each has method, getExtractor(), that returns these respective instances:

KafkaDeserializerExtractor;
KafkaSimpleExtractor and
a class found by ClassAliasResolver. resolveClass.

The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work.

The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek.

Or did we miss something?

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Friday, April 13, 2018 12:01 AM

To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file system after it is created).


It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning.

So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format).





On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?






Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
Yes you seem to correct, the code never lies :)

Can you try implementing your customised Extractor which will extend
KafkaSimpleJsonExtractor
and have the Alias annotation too?

The other option would be to add the Alias annotation to the
KafkaSimpleJsonExtractor, I need to see what implications it will have and
why this was not done earlier. I require some time to look deeper into it,
hope will take some time during the weekend.




On Sat, Apr 14, 2018 at 2:41 AM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> Our correspondence helps with understanding Gobblin. J
>
>
>
> We’re focusing on the source, so no writers are involved yet. We want to
> extract both the key and value from each kafka message.
>
>
>
> There are three classes that extend abstract class, KafkaSource:
>
> KafkaDeserializerSource,
>
> KafkaSimpleSource and
>
> UniversalKafkaSource.
>
>
>
> Each has method, getExtractor(), that returns these respective instances:
>
>
>
> KafkaDeserializerExtractor;
>
> KafkaSimpleExtractor and
>
> a class found by ClassAliasResolver. resolveClass.
>
>
>
> The first two have method, decodeRecord(), that takes parameter,
> ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord
> .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(),
> so these sources won’t work.
>
>
>
> The UniversalKafkaSource class looks interesting because it will search
> for a class that extends KafkaExtractor and instantiate it. But there’s a
> catch: the class must be annotated with @Alias. If class
> KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use
> the UniversalKafaSource along with property, gobblin.source.kafka.extractorType.
> As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both
> ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(),
> these result we seek.
>
>
>
> Or did we miss something?
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Friday, April 13, 2018 12:01 AM
>
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> Are you not having this configuration?
>
> ***********************************************************************
>
> writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
>
> writer.file.path.type=tablename
>
> writer.destination.type=HDFS
>
> writer.output.format=txt
>
> ***********************************************************************
>
>
> I scanned the code for the SimpleDataWriterBuilder which does uses the
> SimpleDataWriter that writes the byte array to the HDFS( I don't see the
> hdfs configuration so assume
>
> that the file is only in hdfs format and it would be copied to hadoop file
> system after it is created).
>
>
>
>
>
> It seems that you want to store the key/value in the Json format ( Or any
> other format) and not the way it is being done by the default
> SimpleDataWriter. You may look for
>
> configuring the KafkaDeserializerExtrator with required deserialized type
> as Json( or other).
>
>
>
> Once this is done then you can use KafkaSimpleJsonExtractor which was
> asked at the beginning.
>
>
>
> So to conclude what I understand is that you don't want the data to be
> stored in the byte array, you require json (or other format).
>
>
>
>
>
>
>
>
>
>
>
> On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
Our correspondence helps with understanding Gobblin. ☺

We’re focusing on the source, so no writers are involved yet. We want to extract both the key and value from each kafka message.

There are three classes that extend abstract class, KafkaSource:
KafkaDeserializerSource,
KafkaSimpleSource and
UniversalKafkaSource.

Each has method, getExtractor(), that returns these respective instances:

KafkaDeserializerExtractor;
KafkaSimpleExtractor and
a class found by ClassAliasResolver. resolveClass.

The first two have method, decodeRecord(), that takes parameter, ByteArrayBasedKafkaRecord, and each calls ByteArrayBasedKafkaRecord .getMessageBytes(). Neither calls ByteArrayBasedKafkaRecord .getKeyBytes(), so these sources won’t work.

The UniversalKafkaSource class looks interesting because it will search for a class that extends KafkaExtractor and instantiate it. But there’s a catch: the class must be annotated with @Alias. If class KafkaSimpleJsonExtractor was annotated with an @Alias, I think I could use the UniversalKafaSource along with property, gobblin.source.kafka.extractorType. As you know, KafkaSimpleJsonExtractor. decodeRecord() combines both ByteArrayBasedKafkaRecord. getKeyBytes() and ByteArrayBasedKafkaRecord. getMessageBytes(), these result we seek.

Or did we miss something?

From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Friday, April 13, 2018 12:01 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the SimpleDataWriter that writes the byte array to the HDFS( I don't see the hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file system after it is created).


It seems that you want to store the key/value in the Json format ( Or any other format) and not the way it is being done by the default SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked at the beginning.

So to conclude what I understand is that you don't want the data to be stored in the byte array, you require json (or other format).





On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?





Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
Are you not having this configuration?
***********************************************************************
writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename writer.destination.type=HDFS writer.output.
format=txt
***********************************************************************

I scanned the code for the SimpleDataWriterBuilder which does uses the
SimpleDataWriter that writes the byte array to the HDFS( I don't see the
hdfs configuration so assume
that the file is only in hdfs format and it would be copied to hadoop file
system after it is created).


It seems that you want to store the key/value in the Json format ( Or any
other format) and not the way it is being done by the default
SimpleDataWriter. You may look for
configuring the KafkaDeserializerExtrator with required deserialized type
as Json( or other).

Once this is done then you can use KafkaSimpleJsonExtractor which was asked
at the beginning.

So to conclude what I understand is that you don't want the data to be
stored in the byte array, you require json (or other format).




On Thu, Apr 12, 2018 at 8:09 PM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> Thank you for your time spent on our question.
>
>
>
> This quick start document,
>
> https://gobblin.readthedocs.io/en/latest/case-studies/
> Kafka-HDFS-Ingestion/,
>
> shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”.
> That class overrides method, getExtractor, to return “new
> KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides
> method, decodeRecord, to return just the Kafka message value.
>
>
>
>   return kafkaConsumerRecord.getMessageBytes()
>
>
>
> Since we want both key and value stored in HDFS, class,
> KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord
> method:
>
>
>
> protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset)
> throws IOException {
>
>         long offset = messageAndOffset.getOffset();
>
>
>
>         byte[] keyBytes = messageAndOffset.getKeyBytes();
>
>         String key = (keyBytes == null) ? "" : new String(keyBytes,
> CHARSET);
>
>
>
>         byte[] payloadBytes = messageAndOffset.getMessageBytes();
>
>         String payload = (payloadBytes == null) ? "" : new
> String(payloadBytes, CHARSET);
>
>
>
>         KafkaRecord record = new KafkaRecord(offset, key, payload);
>
>
>
>         byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
>
>         return decodedRecord;
>
>     }
>
>
>
> The architecture diagram leads us to think that the exactor is the point
> where both Kafka key and value are visible in the Gobblin pipeline:
>
> https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-
> constructs
>
>
>
> We are eager to learn from you.
>
>
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 9:45 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> For writing the data to the HDFS you need to take a look at the Writer
> Implementation, please look for it. The Extractor is used in pulling the
> data which is thereafter passed through various stages before it is stored
> via the Writer.
>
> What I mean by the other subscriber was to add one more consumer which
> will  read the key/value e.g kafka-console-consumer.sh which comes with
> the kafka.
>
> I was not earlier aware that you wanted to store the data in HFDS as you
> had given the reference of the Extractor so it was not clear to me.
>
> Please take a look at the Kafka to HDFS writer and see if that helps, in
> case it is not doing what is exactly required by you then you may have to
> plugin the customized writer.
>
>
>
>
>
> On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
> wrote:
>
> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
Thank you for your time spent on our question.

This quick start document,
https://gobblin.readthedocs.io/en/latest/case-studies/Kafka-HDFS-Ingestion/,
shows “source.class=org.apache.gobblin.source.extractor.extract.kafka.KafkaSimpleSource”. That class overrides method, getExtractor, to return “new KafkaSimpleExtractor(state)”, and class, KafkaSimpleExtractor, overrides method, decodeRecord, to return just the Kafka message value.

  return kafkaConsumerRecord.getMessageBytes()

Since we want both key and value stored in HDFS, class, KafkaSimpleJsonExtractor, appears to provide the desired decodeRecord method:

protected byte[] decodeRecord(ByteArrayBasedKafkaRecord messageAndOffset) throws IOException {
        long offset = messageAndOffset.getOffset();

        byte[] keyBytes = messageAndOffset.getKeyBytes();
        String key = (keyBytes == null) ? "" : new String(keyBytes, CHARSET);

        byte[] payloadBytes = messageAndOffset.getMessageBytes();
        String payload = (payloadBytes == null) ? "" : new String(payloadBytes, CHARSET);

        KafkaRecord record = new KafkaRecord(offset, key, payload);

        byte[] decodedRecord = gson.toJson(record).getBytes(CHARSET);
        return decodedRecord;
    }

The architecture diagram leads us to think that the exactor is the point where both Kafka key and value are visible in the Gobblin pipeline:
https://gobblin.readthedocs.io/en/latest/Gobblin-Architecture/#gobblin-constructs

We are eager to learn from you.


From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Thursday, April 12, 2018 9:45 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

For writing the data to the HDFS you need to take a look at the Writer Implementation, please look for it. The Extractor is used in pulling the data which is thereafter passed through various stages before it is stored via the Writer.
What I mean by the other subscriber was to add one more consumer which will  read the key/value e.g kafka-console-consumer.sh which comes with the kafka.
I was not earlier aware that you wanted to store the data in HFDS as you had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in case it is not doing what is exactly required by you then you may have to plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com<ma...@gmail.com>]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org<ma...@gobblin.incubator.apache.org>
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?




Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
For writing the data to the HDFS you need to take a look at the Writer
Implementation, please look for it. The Extractor is used in pulling the
data which is thereafter passed through various stages before it is stored
via the Writer.
What I mean by the other subscriber was to add one more consumer which
will  read the key/value e.g kafka-console-consumer.sh which comes with the
kafka.
I was not earlier aware that you wanted to store the data in HFDS as you
had given the reference of the Extractor so it was not clear to me.
Please take a look at the Kafka to HDFS writer and see if that helps, in
case it is not doing what is exactly required by you then you may have to
plugin the customized writer.


On Thu, Apr 12, 2018 at 6:47 PM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> We want to extract both key and value from a Kafka message and publish the
> combined information to HDFS. Keys contain numeric ids for the strings
> contained in values.
>
>
>
> Please explain “other subscriber”. Are you proposing a different Kafka
> message structure?
>
>
>
> KafkaSimpleJsonExtractor.java caught my attention because it extracts both
> keys and values and puts them in decodedRecord.
>
>
>
> *From:* Vicky Kak [mailto:vicky.kak@gmail.com]
> *Sent:* Thursday, April 12, 2018 1:17 AM
> *To:* user@gobblin.incubator.apache.org
> *Subject:* Re: KafkaSimpleJsonExtractor
>
>
>
> I am not sure what you are asking for. Do you want to see the keys/values
> rendered in the logs while the extraction is being done?
>
> Why can't you have the other subscriber to the kafka which will render the
> values rather than KafkaExtractor implementation rendering the same?
>
>
>
>
>
> On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <
> Jack_Kidwelljr@comcast.com> wrote:
>
> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>
>
>

RE: KafkaSimpleJsonExtractor

Posted by "Kidwell, Jack" <Ja...@comcast.com>.
We want to extract both key and value from a Kafka message and publish the combined information to HDFS. Keys contain numeric ids for the strings contained in values.

Please explain “other subscriber”. Are you proposing a different Kafka message structure?

KafkaSimpleJsonExtractor.java caught my attention because it extracts both keys and values and puts them in decodedRecord.

From: Vicky Kak [mailto:vicky.kak@gmail.com]
Sent: Thursday, April 12, 2018 1:17 AM
To: user@gobblin.incubator.apache.org
Subject: Re: KafkaSimpleJsonExtractor

I am not sure what you are asking for. Do you want to see the keys/values rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>> wrote:
Hi,

Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java in order to see both kafka record keys and values.

How does one configure a job to achieve it?



Re: KafkaSimpleJsonExtractor

Posted by Vicky Kak <vi...@gmail.com>.
I am not sure what you are asking for. Do you want to see the keys/values
rendered in the logs while the extraction is being done?
Why can't you have the other subscriber to the kafka which will render the
values rather than KafkaExtractor implementation rendering the same?


On Wed, Apr 11, 2018 at 11:17 PM, Kidwell, Jack <Ja...@comcast.com>
wrote:

> Hi,
>
>
>
> Using gobblin_0.10.0, we want to use module, KafkaSimpleJsonExtractor.java
> in order to see both kafka record keys and values.
>
>
>
> How does one configure a job to achieve it?
>
>
>