You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/08/04 14:22:03 UTC
Null pointer exception while trying to serialize a protobuf message
Folks,
I wrote a custom Data source to test me CEP logic. The custom data source
looks like :
public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
private boolean running = true;
private final Random random;
public CustomerDataSource() {
this.random = new Random();
}
@Override
public void run(SourceContext<CustomerMessage> ctx) throws Exception {
while (running) {
new CustomerDataGen().generateMessages().
forEach(element -> ctx.collect(element));
Thread.sleep(10000);
}
}
@Override
public void cancel() {
running = false;
}
}
public class CustomerDataGen {
public CustomerDataGen() {
this.random = new Random();
}
@Override
public List<CustomerMessage> generateMessages() throws
InterruptedException {
List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
messages.add(getMessage());
return messages;
}
private CustomerMessage getMessage() {
Instant time = Instant.now();
Timestamp eventTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
Timestamp creationTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()
-1).setNanos(0).build();
return CustomerMessage.newBuilder().
setName("SomeCustomer").
setEventTimestamp(eventTimeStamp).
setCustomerId("01234").
addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
setEmail("customer@foo.com").
build();
}
}
In my Main program :
.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class,
ProtobufSerializer.class);
env.addSource(new CustomerDataSource());
env.execute();
When I run the program, I get the following exception :
Caused by: java.lang.NullPointerException
at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
I am having a tough time figuring out why. Can someone help me out as
to where am I going wrong?
Re: Null pointer exception while trying to serialize a protobuf message
Posted by Ted Yu <yu...@gmail.com>.
I searched in Flink (and hbase) for GeneratedMessageV3 but didn't find any
reference.
Which version of protobuf did you use to generate the class ?
Please copy user@ in the future so that more people can help.
On Fri, Aug 4, 2017 at 8:27 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> public final class CustomerMessage extends GeneratedMessageV3 implements
> CustomerMessageOrBuilder {
> private int bitField0_;
> public static final int CUSTOMER_ID_FIELD_NUMBER = 1;
> private volatile Object customerId_;
> public static final int EVENT_TIMESTAMP_FIELD_NUMBER = 2;
> private Timestamp eventTimestamp_;
> public static final int NAME_FIELD_NUMBER = 4;
> private volatile Object name_;
> public static final int EMAIL_FIELD_NUMBER = 5;
> private volatile Object email_;
> public static final int PHONE_FIELD_NUMBER = 6;
> private volatile Object phone_;
> public static final int DEVICE_ID_LOGGED_IN_FROM_FIELD_NUMBER = 7;
> private volatile Object deviceIdLoggedInFrom_;
> public static final int REGISTERED_PHONES_FIELD_NUMBER = 8;
> private LazyStringList registeredPhones_;
> private byte memoizedIsInitialized;
> private static final long serialVersionUID = 0L;
> private static final CustomerMessage DEFAULT_INSTANCE = new
> CustomerMessage();
> private static final Parser<CustomerMessage> PARSER = new
> AbstractParser() {
> public CustomerMessage parsePartialFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
> return new CustomerMessage(input, extensionRegistry, null);
> }
> };
>
> private CustomerMessage(com.google.protobuf.GeneratedMessageV3.Builder<?>
> builder) {
> super(builder);
> this.memoizedIsInitialized = -1;
> }
>
> private CustomerMessage() {
> this.memoizedIsInitialized = -1;
> this.customerId_ = "";
> this.name_ = "";
> this.email_ = "";
> this.phone_ = "";
> this.deviceIdLoggedInFrom_ = "";
> this.registeredPhones_ = LazyStringArrayList.EMPTY;
> }
>
> public final UnknownFieldSet getUnknownFields() {
> return UnknownFieldSet.getDefaultInstance();
> }
>
> private CustomerMessage(CodedInputStream input, ExtensionRegistryLite
> extensionRegistry) throws InvalidProtocolBufferException {
> this();
> int mutable_bitField0_ = 0;
>
> try {
> boolean e = false;
>
> while(!e) {
> int tag = input.readTag();
> String s;
> switch(tag) {
> case 0:
> e = true;
> break;
> case 10:
> s = input.readStringRequireUtf8();
> this.customerId_ = s;
> break;
> case 18:
> com.google.protobuf.Timestamp.Builder s1 = null;
> if(this.eventTimestamp_ != null) {
> s1 = this.eventTimestamp_.toBuilder();
> }
>
> this.eventTimestamp_ = (Timestamp)input.readMessage(Timestamp.parser(),
> extensionRegistry);
> if(s1 != null) {
> s1.mergeFrom(this.eventTimestamp_);
> this.eventTimestamp_ = s1.buildPartial();
> }
> break;
> case 34:
> s = input.readStringRequireUtf8();
> this.name_ = s;
> break;
> case 42:
> s = input.readStringRequireUtf8();
> this.email_ = s;
> break;
> case 50:
> s = input.readStringRequireUtf8();
> this.phone_ = s;
> break;
> case 58:
> s = input.readStringRequireUtf8();
> this.deviceIdLoggedInFrom_ = s;
> break;
> case 66:
> s = input.readStringRequireUtf8();
> if((mutable_bitField0_ & 64) != 64) {
> this.registeredPhones_ = new LazyStringArrayList();
> mutable_bitField0_ |= 64;
> }
>
> this.registeredPhones_.add(s);
> break;
> default:
> if(!input.skipField(tag)) {
> e = true;
> }
> }
> }
> } catch (InvalidProtocolBufferException var11) {
> throw var11.setUnfinishedMessage(this);
> } catch (IOException var12) {
> throw (new InvalidProtocolBufferException
> (var12)).setUnfinishedMessage(this);
> } finally {
> if((mutable_bitField0_ & 64) == 64) {
> this.registeredPhones_ = this.registeredPhones_.
> getUnmodifiableView();
> }
>
> this.makeExtensionsImmutable();
> }
>
> }
>
> public static final Descriptor getDescriptor() {
> return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
> }
>
> protected FieldAccessorTable internalGetFieldAccessorTable() {
> return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class,
> CustomerMessage.Builder.class);
> }
>
> public String getCustomerId() {
> Object ref = this.customerId_;
> if(ref instanceof String) {
> return (String)ref;
> } else {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.customerId_ = s;
> return s;
> }
> }
>
> public ByteString getCustomerIdBytes() {
> Object ref = this.customerId_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.customerId_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public boolean hasEventTimestamp() {
> return this.eventTimestamp_ != null;
> }
>
> public Timestamp getEventTimestamp() {
> return this.eventTimestamp_ == null?Timestamp.
> getDefaultInstance():this.eventTimestamp_;
> }
>
> public TimestampOrBuilder getEventTimestampOrBuilder() {
> return this.getEventTimestamp();
> }
>
> public String getName() {
> Object ref = this.name_;
> if(ref instanceof String) {
> return (String)ref;
> } else {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.name_ = s;
> return s;
> }
> }
>
> public ByteString getNameBytes() {
> Object ref = this.name_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.name_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public String getEmail() {
> Object ref = this.email_;
> if(ref instanceof String) {
> return (String)ref;
> } else {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.email_ = s;
> return s;
> }
> }
>
> public ByteString getEmailBytes() {
> Object ref = this.email_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.email_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public String getPhone() {
> Object ref = this.phone_;
> if(ref instanceof String) {
> return (String)ref;
> } else {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.phone_ = s;
> return s;
> }
> }
>
> public ByteString getPhoneBytes() {
> Object ref = this.phone_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.phone_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public String getDeviceIdLoggedInFrom() {
> Object ref = this.deviceIdLoggedInFrom_;
> if(ref instanceof String) {
> return (String)ref;
> } else {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.deviceIdLoggedInFrom_ = s;
> return s;
> }
> }
>
> public ByteString getDeviceIdLoggedInFromBytes() {
> Object ref = this.deviceIdLoggedInFrom_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.deviceIdLoggedInFrom_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public ProtocolStringList getRegisteredDevicesList() {
> return this.registeredPhones_;
> }
>
> public int getRegisteredDevicesCount() {
> return this.registeredPhones_.size();
> }
>
> public String getRegisteredDevices(int index) {
> return (String)this.registeredPhones_.get(index);
> }
>
> public ByteString getRegisteredDevicesBytes(int index) {
> return this.registeredPhones_.getByteString(index);
> }
>
> public final boolean isInitialized() {
> byte isInitialized = this.memoizedIsInitialized;
> if(isInitialized == 1) {
> return true;
> } else if(isInitialized == 0) {
> return false;
> } else {
> this.memoizedIsInitialized = 1;
> return true;
> }
> }
>
> public void writeTo(CodedOutputStream output) throws IOException {
> if(!this.getCustomerIdBytes().isEmpty()) {
> GeneratedMessageV3.writeString(output, 1, this.customerId_);
> }
>
> if(this.eventTimestamp_ != null) {
> output.writeMessage(2, this.getEventTimestamp());
> }
>
> if(!this.getNameBytes().isEmpty()) {
> GeneratedMessageV3.writeString(output, 4, this.name_);
> }
>
> if(!this.getEmailBytes().isEmpty()) {
> GeneratedMessageV3.writeString(output, 5, this.email_);
> }
>
> if(!this.getPhoneBytes().isEmpty()) {
> GeneratedMessageV3.writeString(output, 6, this.phone_);
> }
>
> if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
> GeneratedMessageV3.writeString(output, 7,
> this.deviceIdLoggedInFrom_);
> }
>
> for(int i = 0; i < this.registeredPhones_.size(); ++i) {
> GeneratedMessageV3.writeString(output, 8,
> this.registeredPhones_.getRaw(i));
> }
>
> }
>
> public int getSerializedSize() {
> int size = this.memoizedSize;
> if(size != -1) {
> return size;
> } else {
> size = 0;
> if(!this.getCustomerIdBytes().isEmpty()) {
> size += GeneratedMessageV3.computeStringSize(1,
> this.customerId_);
> }
>
> if(this.eventTimestamp_ != null) {
> size += CodedOutputStream.computeMessageSize(2,
> this.getEventTimestamp());
> }
>
> if(!this.getNameBytes().isEmpty()) {
> size += GeneratedMessageV3.computeStringSize(4,
> this.name_);
> }
>
> if(!this.getEmailBytes().isEmpty()) {
> size += GeneratedMessageV3.computeStringSize(5,
> this.email_);
> }
>
> if(!this.getPhoneBytes().isEmpty()) {
> size += GeneratedMessageV3.computeStringSize(6,
> this.phone_);
> }
>
> if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
> size += GeneratedMessageV3.computeStringSize(7,
> this.deviceIdLoggedInFrom_);
> }
>
> int dataSize = 0;
>
> for(int i = 0; i < this.registeredPhones_.size(); ++i) {
> dataSize += computeStringSizeNoTag(this.
> registeredPhones_.getRaw(i));
> }
>
> size += dataSize;
> size += 1 * this.getRegisteredDevicesList().size();
> this.memoizedSize = size;
> return size;
> }
> }
>
> public boolean equals(Object obj) {
> if(obj == this) {
> return true;
> } else if(!(obj instanceof CustomerMessage)) {
> return super.equals(obj);
> } else {
> CustomerMessage other = (CustomerMessage)obj;
> boolean result = true;
> result = result && this.getCustomerId().equals(
> other.getCustomerId());
> result = result && this.hasEventTimestamp() ==
> other.hasEventTimestamp();
> if(this.hasEventTimestamp()) {
> result = result && this.getEventTimestamp().equals(other.
> getEventTimestamp());
> }
>
> result = result && this.getName().equals(other.getName());
> result = result && this.getEmail().equals(other.getEmail());
> result = result && this.getPhone().equals(other.getPhone());
> result = result && this.getDeviceIdLoggedInFrom()
> .equals(other.getDeviceIdLoggedInFrom());
> result = result && this.getRegisteredDevicesList(
> ).equals(other.getRegisteredDevicesList());
> return result;
> }
> }
>
> public int hashCode() {
> if(this.memoizedHashCode != 0) {
> return this.memoizedHashCode;
> } else {
> byte hash = 41;
> int hash1 = 19 * hash + this.getDescriptorForType().
> hashCode();
> hash1 = 37 * hash1 + 1;
> hash1 = 53 * hash1 + this.getCustomerId().hashCode();
> if(this.hasEventTimestamp()) {
> hash1 = 37 * hash1 + 2;
> hash1 = 53 * hash1 + this.getEventTimestamp().hashCode();
> }
>
> hash1 = 37 * hash1 + 4;
> hash1 = 53 * hash1 + this.getName().hashCode();
> hash1 = 37 * hash1 + 5;
> hash1 = 53 * hash1 + this.getEmail().hashCode();
> hash1 = 37 * hash1 + 6;
> hash1 = 53 * hash1 + this.getPhone().hashCode();
> hash1 = 37 * hash1 + 7;
> hash1 = 53 * hash1 + this.getDeviceIdLoggedInFrom()
> .hashCode();
> if(this.getRegisteredDevicesCount() > 0) {
> hash1 = 37 * hash1 + 8;
> hash1 = 53 * hash1 + this.getRegisteredDevicesList(
> ).hashCode();
> }
>
> hash1 = 29 * hash1 + this.unknownFields.hashCode();
> this.memoizedHashCode = hash1;
> return hash1;
> }
> }
>
> public static CustomerMessage parseFrom(ByteString data) throws
> InvalidProtocolBufferException {
> return (CustomerMessage)PARSER.parseFrom(data);
> }
>
> public static CustomerMessage parseFrom(ByteString data,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
> return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
> }
>
> public static CustomerMessage parseFrom(byte[] data) throws
> InvalidProtocolBufferException {
> return (CustomerMessage)PARSER.parseFrom(data);
> }
>
> public static CustomerMessage parseFrom(byte[] data,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
> return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
> }
>
> public static CustomerMessage parseFrom(InputStream input) throws
> IOException {
> return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input);
> }
>
> public static CustomerMessage parseFrom(InputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
> return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input, extensionRegistry);
> }
>
> public static CustomerMessage parseDelimitedFrom(InputStream input)
> throws IOException {
> return (CustomerMessage)GeneratedMessageV3.
> parseDelimitedWithIOException(PARSER, input);
> }
>
> public static CustomerMessage parseDelimitedFrom(InputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
> return (CustomerMessage)GeneratedMessageV3.
> parseDelimitedWithIOException(PARSER, input, extensionRegistry);
> }
>
> public static CustomerMessage parseFrom(CodedInputStream input) throws
> IOException {
> return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input);
> }
>
> public static CustomerMessage parseFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
> return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input, extensionRegistry);
> }
>
> public CustomerMessage.Builder newBuilderForType() {
> return newBuilder();
> }
>
> public static CustomerMessage.Builder newBuilder() {
> return DEFAULT_INSTANCE.toBuilder();
> }
>
> public static CustomerMessage.Builder newBuilder(CustomerMessage
> prototype) {
> return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
> }
>
> public CustomerMessage.Builder toBuilder() {
> return this == DEFAULT_INSTANCE?new CustomerMessage.Builder(null):(new
> CustomerMessage.Builder(null)).mergeFrom(this);
> }
>
> protected CustomerMessage.Builder newBuilderForType(BuilderParent
> parent) {
> CustomerMessage.Builder builder = new CustomerMessage.Builder(parent,
> null);
> return builder;
> }
>
> public static CustomerMessage getDefaultInstance() {
> return DEFAULT_INSTANCE;
> }
>
> public static Parser<CustomerMessage> parser() {
> return PARSER;
> }
>
> public Parser<CustomerMessage> getParserForType() {
> return PARSER;
> }
>
> public CustomerMessage getDefaultInstanceForType() {
> return DEFAULT_INSTANCE;
> }
>
> public static final class Builder extends com.google.protobuf.
> GeneratedMessageV3.Builder<CustomerMessage.Builder> implements
> CustomerMessageOrBuilder {
> private int bitField0_;
> private Object customerId_;
> private Timestamp eventTimestamp_;
> private SingleFieldBuilderV3<Timestamp,
> com.google.protobuf.Timestamp.Builder, TimestampOrBuilder>
> eventTimestampBuilder_;
> private Object name_;
> private Object email_;
> private Object phone_;
> private Object deviceIdLoggedInFrom_;
> private LazyStringList registeredPhones_;
>
> public static final Descriptor getDescriptor() {
> return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
> }
>
> protected FieldAccessorTable internalGetFieldAccessorTable() {
> return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class,
> CustomerMessage.Builder.class);
> }
>
> private Builder() {
> this.customerId_ = "";
> this.eventTimestamp_ = null;
> this.name_ = "";
> this.email_ = "";
> this.phone_ = "";
> this.deviceIdLoggedInFrom_ = "";
> this.registeredPhones_ = LazyStringArrayList.EMPTY;
> this.maybeForceBuilderInitialization();
> }
>
> private Builder(BuilderParent parent) {
> super(parent);
> this.customerId_ = "";
> this.eventTimestamp_ = null;
> this.name_ = "";
> this.email_ = "";
> this.phone_ = "";
> this.deviceIdLoggedInFrom_ = "";
> this.registeredPhones_ = LazyStringArrayList.EMPTY;
> this.maybeForceBuilderInitialization();
> }
>
> private void maybeForceBuilderInitialization() {
> if(CustomerMessage.alwaysUseFieldBuilders) {
> ;
> }
>
> }
>
> public CustomerMessage.Builder clear() {
> super.clear();
> this.customerId_ = "";
> if(this.eventTimestampBuilder_ == null) {
> this.eventTimestamp_ = null;
> } else {
> this.eventTimestamp_ = null;
> this.eventTimestampBuilder_ = null;
> }
>
> this.name_ = "";
> this.email_ = "";
> this.phone_ = "";
> this.deviceIdLoggedInFrom_ = "";
> this.registeredPhones_ = LazyStringArrayList.EMPTY;
> this.bitField0_ &= -65;
> return this;
> }
>
> public Descriptor getDescriptorForType() {
> return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
> }
>
> public CustomerMessage getDefaultInstanceForType() {
> return CustomerMessage.DEFAULT_INSTANCE;
> }
>
> public CustomerMessage build() {
> CustomerMessage result = this.buildPartial();
> if(!result.isInitialized()) {
> throw newUninitializedMessageException(result);
> } else {
> return result;
> }
> }
>
> public CustomerMessage buildPartial() {
> CustomerMessage result = new CustomerMessage(this, null);
> int from_bitField0_ = this.bitField0_;
> byte to_bitField0_ = 0;
> result.customerId_ = this.customerId_;
> if(this.eventTimestampBuilder_ == null) {
> result.eventTimestamp_ = this.eventTimestamp_;
> } else {
> result.eventTimestamp_ = (Timestamp)this.
> eventTimestampBuilder_.build();
> }
>
> result.name_ = this.name_;
> result.email_ = this.email_;
> result.phone_ = this.phone_;
> result.deviceIdLoggedInFrom_ = this.deviceIdLoggedInFrom_;
> if((this.bitField0_ & 64) == 64) {
> this.registeredPhones_ = this.registeredPhones_.
> getUnmodifiableView();
> this.bitField0_ &= -65;
> }
>
> result.registeredPhones_ = this.registeredPhones_;
> result.bitField0_ = to_bitField0_;
> this.onBuilt();
> return result;
> }
>
> public CustomerMessage.Builder clone() {
> return (CustomerMessage.Builder)super.clone();
> }
>
> public CustomerMessage.Builder setField(FieldDescriptor field,
> Object value) {
> return (CustomerMessage.Builder)super.setField(field, value);
> }
>
> public CustomerMessage.Builder clearField(FieldDescriptor field) {
> return (CustomerMessage.Builder)super.clearField(field);
> }
>
> public CustomerMessage.Builder clearOneof(OneofDescriptor oneof) {
> return (CustomerMessage.Builder)super.clearOneof(oneof);
> }
>
> public CustomerMessage.Builder setRepeatedField(FieldDescriptor
> field, int index, Object value) {
> return (CustomerMessage.Builder)super.setRepeatedField(field,
> index, value);
> }
>
> public CustomerMessage.Builder addRepeatedField(FieldDescriptor
> field, Object value) {
> return (CustomerMessage.Builder)super.addRepeatedField(field,
> value);
> }
>
> public CustomerMessage.Builder mergeFrom(Message other) {
> if(other instanceof CustomerMessage) {
> return this.mergeFrom((CustomerMessage)other);
> } else {
> super.mergeFrom(other);
> return this;
> }
> }
>
> public CustomerMessage.Builder mergeFrom(CustomerMessage other) {
> if(other == CustomerMessage.DEFAULT_INSTANCE) {
> return this;
> } else {
> if(!other.getCustomerId().isEmpty()) {
> this.customerId_ = other.customerId_;
> this.onChanged();
> }
>
> if(other.hasEventTimestamp()) {
> this.mergeEventTimestamp(other.getEventTimestamp());
> }
>
> if(!other.getName().isEmpty()) {
> this.name_ = other.name_;
> this.onChanged();
> }
>
> if(!other.getEmail().isEmpty()) {
> this.email_ = other.email_;
> this.onChanged();
> }
>
> if(!other.getPhone().isEmpty()) {
> this.phone_ = other.phone_;
> this.onChanged();
> }
>
> if(!other.getDeviceIdLoggedInFrom().isEmpty()) {
> this.deviceIdLoggedInFrom_ =
> other.deviceIdLoggedInFrom_;
> this.onChanged();
> }
>
> if(!other.registeredPhones_.isEmpty()) {
> if(this.registeredPhones_.isEmpty()) {
> this.registeredPhones_ = other.registeredPhones_;
> this.bitField0_ &= -65;
> } else {
> this.ensureRegisteredDevicesIsMutable();
> this.registeredPhones_.addAll(
> other.registeredPhones_);
> }
>
> this.onChanged();
> }
>
> this.onChanged();
> return this;
> }
> }
>
> public final boolean isInitialized() {
> return true;
> }
>
> public CustomerMessage.Builder mergeFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
> CustomerMessage parsedMessage = null;
>
> try {
> parsedMessage = (CustomerMessage)CustomerMessage.PARSER.parsePartialFrom(input,
> extensionRegistry);
> } catch (InvalidProtocolBufferException var8) {
> parsedMessage = (CustomerMessage)var8.
> getUnfinishedMessage();
> throw var8.unwrapIOException();
> } finally {
> if(parsedMessage != null) {
> this.mergeFrom(parsedMessage);
> }
>
> }
>
> return this;
> }
>
> public String getCustomerId() {
> Object ref = this.customerId_;
> if(!(ref instanceof String)) {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.customerId_ = s;
> return s;
> } else {
> return (String)ref;
> }
> }
>
> public ByteString getCustomerIdBytes() {
> Object ref = this.customerId_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.customerId_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public CustomerMessage.Builder setCustomerId(String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.customerId_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder clearCustomerId() {
> this.customerId_ = CustomerMessage.DEFAULT_
> INSTANCE.getCustomerId();
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder setCustomerIdBytes(ByteString
> value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.customerId_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public boolean hasEventTimestamp() {
> return this.eventTimestampBuilder_ != null ||
> this.eventTimestamp_ != null;
> }
>
> public Timestamp getEventTimestamp() {
> return this.eventTimestampBuilder_ ==
> null?(this.eventTimestamp_ == null?Timestamp.getDefaultInstance():this.
> eventTimestamp_):(Timestamp)this.eventTimestampBuilder_.getMessage();
> }
>
> public CustomerMessage.Builder setEventTimestamp(Timestamp value) {
> if(this.eventTimestampBuilder_ == null) {
> if(value == null) {
> throw new NullPointerException();
> }
>
> this.eventTimestamp_ = value;
> this.onChanged();
> } else {
> this.eventTimestampBuilder_.setMessage(value);
> }
>
> return this;
> }
>
> public CustomerMessage.Builder setEventTimestamp(com.google.protobuf.Timestamp.Builder
> builderForValue) {
> if(this.eventTimestampBuilder_ == null) {
> this.eventTimestamp_ = builderForValue.build();
> this.onChanged();
> } else {
> this.eventTimestampBuilder_.setMessage(builderForValue.
> build());
> }
>
> return this;
> }
>
> public CustomerMessage.Builder mergeEventTimestamp(Timestamp
> value) {
> if(this.eventTimestampBuilder_ == null) {
> if(this.eventTimestamp_ != null) {
> this.eventTimestamp_ = Timestamp.newBuilder(this.
> eventTimestamp_).mergeFrom(value).buildPartial();
> } else {
> this.eventTimestamp_ = value;
> }
>
> this.onChanged();
> } else {
> this.eventTimestampBuilder_.mergeFrom(value);
> }
>
> return this;
> }
>
> public CustomerMessage.Builder clearEventTimestamp() {
> if(this.eventTimestampBuilder_ == null) {
> this.eventTimestamp_ = null;
> this.onChanged();
> } else {
> this.eventTimestamp_ = null;
> this.eventTimestampBuilder_ = null;
> }
>
> return this;
> }
>
> public com.google.protobuf.Timestamp.Builder
> getEventTimestampBuilder() {
> this.onChanged();
> return (com.google.protobuf.Timestamp.Builder)this.
> getEventTimestampFieldBuilder().getBuilder();
> }
>
> public TimestampOrBuilder getEventTimestampOrBuilder() {
> return (TimestampOrBuilder)(this.eventTimestampBuilder_ !=
> null?(TimestampOrBuilder)this.eventTimestampBuilder_.
> getMessageOrBuilder():(this.eventTimestamp_ == null?Timestamp.
> getDefaultInstance():this.eventTimestamp_));
> }
>
> private SingleFieldBuilderV3<Timestamp,
> com.google.protobuf.Timestamp.Builder, TimestampOrBuilder>
> getEventTimestampFieldBuilder() {
> if(this.eventTimestampBuilder_ == null) {
> this.eventTimestampBuilder_ = new
> SingleFieldBuilderV3(this.getEventTimestamp(),
> this.getParentForChildren(), this.isClean());
> this.eventTimestamp_ = null;
> }
>
> return this.eventTimestampBuilder_;
> }
>
> public String getName() {
> Object ref = this.name_;
> if(!(ref instanceof String)) {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.name_ = s;
> return s;
> } else {
> return (String)ref;
> }
> }
>
> public ByteString getNameBytes() {
> Object ref = this.name_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.name_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public CustomerMessage.Builder setName(String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.name_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder clearName() {
> this.name_ = CustomerMessage.DEFAULT_INSTANCE.getName();
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder setNameBytes(ByteString value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.name_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public String getEmail() {
> Object ref = this.email_;
> if(!(ref instanceof String)) {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.email_ = s;
> return s;
> } else {
> return (String)ref;
> }
> }
>
> public ByteString getEmailBytes() {
> Object ref = this.email_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.email_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public CustomerMessage.Builder setEmail(String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.email_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder clearEmail() {
> this.email_ = CustomerMessage.DEFAULT_INSTANCE.getEmail();
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder setEmailBytes(ByteString value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.email_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public String getPhone() {
> Object ref = this.phone_;
> if(!(ref instanceof String)) {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.phone_ = s;
> return s;
> } else {
> return (String)ref;
> }
> }
>
> public ByteString getPhoneBytes() {
> Object ref = this.phone_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.phone_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public CustomerMessage.Builder setPhone(String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.phone_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder clearPhone() {
> this.phone_ = CustomerMessage.DEFAULT_INSTANCE.getPhone();
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder setPhoneBytes(ByteString value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.phone_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public String getDeviceIdLoggedInFrom() {
> Object ref = this.deviceIdLoggedInFrom_;
> if(!(ref instanceof String)) {
> ByteString bs = (ByteString)ref;
> String s = bs.toStringUtf8();
> this.deviceIdLoggedInFrom_ = s;
> return s;
> } else {
> return (String)ref;
> }
> }
>
> public ByteString getDeviceIdLoggedInFromBytes() {
> Object ref = this.deviceIdLoggedInFrom_;
> if(ref instanceof String) {
> ByteString b = ByteString.copyFromUtf8((String)ref);
> this.deviceIdLoggedInFrom_ = b;
> return b;
> } else {
> return (ByteString)ref;
> }
> }
>
> public CustomerMessage.Builder setDeviceIdLoggedInFrom(String
> value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.deviceIdLoggedInFrom_ = value;
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder clearDeviceIdLoggedInFrom() {
> this.deviceIdLoggedInFrom_ = CustomerMessage.DEFAULT_INSTANCE.
> getDeviceIdLoggedInFrom();
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder setDeviceIdLoggedInFromBytes(ByteString
> value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.deviceIdLoggedInFrom_ = value;
> this.onChanged();
> return this;
> }
> }
>
> private void ensureRegisteredDevicesIsMutable() {
> if((this.bitField0_ & 64) != 64) {
> this.registeredPhones_ = new LazyStringArrayList(this.
> registeredPhones_);
> this.bitField0_ |= 64;
> }
>
> }
>
> public ProtocolStringList getRegisteredDevicesList() {
> return this.registeredPhones_.getUnmodifiableView();
> }
>
> public int getRegisteredDevicesCount() {
> return this.registeredPhones_.size();
> }
>
> public String getRegisteredDevices(int index) {
> return (String)this.registeredPhones_.get(index);
> }
>
> public ByteString getRegisteredDevicesBytes(int index) {
> return this.registeredPhones_.getByteString(index);
> }
>
> public CustomerMessage.Builder setRegisteredDevices(int index,
> String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.ensureRegisteredDevicesIsMutable();
> this.registeredPhones_.set(index, value);
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder addRegisteredDevices(String value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> this.ensureRegisteredDevicesIsMutable();
> this.registeredPhones_.add(value);
> this.onChanged();
> return this;
> }
> }
>
> public CustomerMessage.Builder addAllRegisteredDevices(Iterable<String>
> values) {
> this.ensureRegisteredDevicesIsMutable();
> com.google.protobuf.AbstractMessageLite.Builder.addAll(values,
> this.registeredPhones_);
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder clearRegisteredDevices() {
> this.registeredPhones_ = LazyStringArrayList.EMPTY;
> this.bitField0_ &= -65;
> this.onChanged();
> return this;
> }
>
> public CustomerMessage.Builder addRegisteredDevicesBytes(ByteString
> value) {
> if(value == null) {
> throw new NullPointerException();
> } else {
> CustomerMessage.checkByteStringIsUtf8(value);
> this.ensureRegisteredDevicesIsMutable();
> this.registeredPhones_.add(value);
> this.onChanged();
> return this;
> }
> }
>
> public final CustomerMessage.Builder setUnknownFields(UnknownFieldSet
> unknownFields) {
> return this;
> }
>
> public final CustomerMessage.Builder mergeUnknownFields(UnknownFieldSet
> unknownFields) {
> return this;
> }
> }
> }
>
>
>
> On Fri, Aug 4, 2017 at 8:07 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Can you show how CustomerMessage is defined ?
>>
>> Thanks
>>
>> On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> Folks,
>>>
>>> I wrote a custom Data source to test me CEP logic. The custom data
>>> source looks like :
>>>
>>> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
>>> private boolean running = true;
>>> private final Random random;
>>>
>>> public CustomerDataSource() {
>>> this.random = new Random();
>>> }
>>>
>>> @Override
>>> public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>>> while (running) {
>>> new CustomerDataGen().generateMessages().
>>> forEach(element -> ctx.collect(element));
>>>
>>> Thread.sleep(10000);
>>> }
>>> }
>>>
>>> @Override
>>> public void cancel() {
>>> running = false;
>>> }
>>> }
>>>
>>> public class CustomerDataGen {
>>>
>>> public CustomerDataGen() {
>>> this.random = new Random();
>>> }
>>> @Override
>>> public List<CustomerMessage> generateMessages() throws InterruptedException {
>>> List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>>>
>>> messages.add(getMessage());
>>> return messages;
>>> }
>>>
>>> private CustomerMessage getMessage() {
>>> Instant time = Instant.now();
>>> Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>>> Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
>>> return CustomerMessage.newBuilder().
>>> setName("SomeCustomer").
>>> setEventTimestamp(eventTimeStamp).
>>> setCustomerId("01234").
>>> addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>>> setEmail("customer@foo.com").
>>> build();
>>> }
>>> }
>>>
>>> In my Main program :
>>>
>>> .........
>>> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
>>> env.addSource(new CustomerDataSource());
>>>
>>> env.execute();
>>>
>>>
>>> When I run the program, I get the following exception :
>>>
>>>
>>> Caused by: java.lang.NullPointerException
>>> at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
>>> at java.util.AbstractList.add(AbstractList.java:108)
>>> at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
>>> at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
>>> at java.util.ArrayList.forEach(ArrayList.java:1249)
>>> at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?
>>>
>>>
>>>
>>
>
Re: Null pointer exception while trying to serialize a protobuf message
Posted by Ted Yu <yu...@gmail.com>.
Can you show how CustomerMessage is defined ?
Thanks
On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:
> Folks,
>
> I wrote a custom Data source to test me CEP logic. The custom data source
> looks like :
>
> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
> private boolean running = true;
> private final Random random;
>
> public CustomerDataSource() {
> this.random = new Random();
> }
>
> @Override
> public void run(SourceContext<CustomerMessage> ctx) throws Exception {
> while (running) {
> new CustomerDataGen().generateMessages().
> forEach(element -> ctx.collect(element));
>
> Thread.sleep(10000);
> }
> }
>
> @Override
> public void cancel() {
> running = false;
> }
> }
>
> public class CustomerDataGen {
>
> public CustomerDataGen() {
> this.random = new Random();
> }
> @Override
> public List<CustomerMessage> generateMessages() throws InterruptedException {
> List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>
> messages.add(getMessage());
> return messages;
> }
>
> private CustomerMessage getMessage() {
> Instant time = Instant.now();
> Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
> Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
> return CustomerMessage.newBuilder().
> setName("SomeCustomer").
> setEventTimestamp(eventTimeStamp).
> setCustomerId("01234").
> addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
> setEmail("customer@foo.com").
> build();
> }
> }
>
> In my Main program :
>
> .........
> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
> env.addSource(new CustomerDataSource());
>
> env.execute();
>
>
> When I run the program, I get the following exception :
>
>
> Caused by: java.lang.NullPointerException
> at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
> at java.util.AbstractList.add(AbstractList.java:108)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
> at java.util.ArrayList.forEach(ArrayList.java:1249)
> at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
>
> I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?
>
>
>