You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/12/17 17:01:11 UTC
[pulsar] branch master updated: improve datagenerator source (#3203)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ee53ab6 improve datagenerator source (#3203)
ee53ab6 is described below
commit ee53ab6f21299dad0f19ab42caa94a7e9c336167
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Dec 17 09:01:06 2018 -0800
improve datagenerator source (#3203)
* improve datagenerator source
* cleaning up
---
pulsar-io/data-genenator/pom.xml | 5 ++
...atorSource.java => DataGeneratorPrintSink.java} | 30 ++-----
.../io/datagenerator/DataGeneratorSource.java | 4 +-
.../org/apache/pulsar/io/datagenerator/Person.java | 96 ++++++++++++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 1 +
5 files changed, 111 insertions(+), 25 deletions(-)
diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-genenator/pom.xml
index 8e9f7ad..6a6b72a 100644
--- a/pulsar-io/data-genenator/pom.xml
+++ b/pulsar-io/data-genenator/pom.xml
@@ -43,6 +43,11 @@
<artifactId>jfairy</artifactId>
<version>0.5.9</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
</dependencies>
diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
similarity index 57%
copy from pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
copy to pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
index 6087747..6944247 100644
--- a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java
@@ -18,38 +18,24 @@
*/
package org.apache.pulsar.io.datagenerator;
-import io.codearte.jfairy.Fairy;
-import io.codearte.jfairy.producer.person.Person;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Source;
-import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
import java.util.Map;
-import java.util.Optional;
-
-public class DataGeneratorSource implements Source<Person> {
+@Slf4j
+public class DataGeneratorPrintSink implements Sink<Person> {
@Override
- public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
}
@Override
- public Record<Person> read() throws Exception {
- Thread.sleep(50);
- Fairy fairy = Fairy.create();
- return new Record<Person>() {
- @Override
- public Optional<String> getKey() {
- return Optional.empty();
- }
-
- @Override
- public Person getValue() {
- return fairy.person();
- }
- };
+ public void write(Record<Person> record) throws Exception {
+ log.info("RECV: {}", record.getValue());
}
@Override
diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
index 6087747..1a9f63f 100644
--- a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
+++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.io.datagenerator;
import io.codearte.jfairy.Fairy;
-import io.codearte.jfairy.producer.person.Person;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
@@ -38,7 +37,6 @@ public class DataGeneratorSource implements Source<Person> {
@Override
public Record<Person> read() throws Exception {
Thread.sleep(50);
- Fairy fairy = Fairy.create();
return new Record<Person>() {
@Override
public Optional<String> getKey() {
@@ -47,7 +45,7 @@ public class DataGeneratorSource implements Source<Person> {
@Override
public Person getValue() {
- return fairy.person();
+ return new Person(Fairy.create().person());
}
};
}
diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
new file mode 100644
index 0000000..c822d31
--- /dev/null
+++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.datagenerator;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+/**
+ * This class serves as a copy of of io.codearte.jfairy.producer.person.Person
+ * because io.codearte.jfairy.producer.person.Person does not
+ * have default constructors needed to deserialize POJOs
+ */
+public class Person {
+ private Address address;
+ private String firstName;
+ private String middleName;
+ private String lastName;
+ private String email;
+ private String username;
+ private String password;
+ private Sex sex;
+ private String telephoneNumber;
+ @org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", \"logicalType\": \"timestamp-millis\" }")
+ private long dateOfBirth;
+ private Integer age;
+ private Company company;
+ private String companyEmail;
+ private String nationalIdentityCardNumber;
+ private String nationalIdentificationNumber;
+ private String passportNumber;
+
+ public enum Sex {
+ MALE,
+ FEMALE;
+
+ private Sex() {
+ }
+ }
+
+ public Person(io.codearte.jfairy.producer.person.Person person) {
+ this(new Address(person.getAddress()), person.getFirstName(), person.getMiddleName(), person.getLastName(),
+ person.getEmail(), person.getUsername(), person.getPassword(), Sex.valueOf(person.getSex().name()),
+ person.getTelephoneNumber(), person.getDateOfBirth().getMillis(),
+ person.getAge(), new Company(person.getCompany()), person.getCompanyEmail(),
+ person.getNationalIdentityCardNumber(), person.getNationalIdentificationNumber(),
+ person.getPassportNumber());
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class Company {
+ private String name;
+ private String domain;
+ private String email;
+ private String vatIdentificationNumber;
+ public Company(io.codearte.jfairy.producer.company.Company company) {
+ this(company.getName(), company.getDomain(), company.getEmail(), company.getVatIdentificationNumber());
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ public static class Address {
+ protected String street;
+ protected String streetNumber;
+ protected String apartmentNumber;
+ protected String postalCode;
+ protected String city;
+
+ public Address(io.codearte.jfairy.producer.person.Address address) {
+ this(address.getStreet(), address.getStreetNumber(), address.getApartmentNumber(), address.getPostalCode(), address.getCity());
+ }
+ }
+}
diff --git a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
index d0fd7a6..0e5d723 100644
--- a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -20,3 +20,4 @@
name: data-generator
description: Test data generator source
sourceClass: org.apache.pulsar.io.datagenerator.DataGeneratorSource
+sinkClass: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink