You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sridhar Chellappa <fl...@gmail.com> on 2017/08/04 14:22:03 UTC

Null pointer exception while trying to serialize a protobuf message

Folks,

I wrote a custom Data source to test me CEP logic. The custom data source
looks like :

public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
    private boolean running = true;
    private final Random random;

    public CustomerDataSource() {
        this.random = new Random();
    }

    @Override
    public void run(SourceContext<CustomerMessage> ctx) throws Exception {
        while (running) {
            new CustomerDataGen().generateMessages().
                    forEach(element -> ctx.collect(element));

            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

public class CustomerDataGen {

    public CustomerDataGen() {
        this.random = new Random();
    }
    @Override
    public List<CustomerMessage> generateMessages() throws
InterruptedException {
        List<CustomerMessage> messages = new ArrayList<CustomerMessage>();

        messages.add(getMessage());
        return messages;
    }

    private CustomerMessage getMessage() {
        Instant time = Instant.now();
        Timestamp eventTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
        Timestamp creationTimeStamp =
Timestamp.newBuilder().setSeconds(time.getEpochSecond()
-1).setNanos(0).build();
        return CustomerMessage.newBuilder().
                setName("SomeCustomer").
                setEventTimestamp(eventTimeStamp).
                setCustomerId("01234").
                addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
                setEmail("customer@foo.com").
                build();
    }
}

In my Main program :

.........
env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class,
ProtobufSerializer.class);
env.addSource(new CustomerDataSource());

env.execute();


When I run the program, I get the following exception :


Caused by: java.lang.NullPointerException
	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
	at java.util.AbstractList.add(AbstractList.java:108)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
	at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
	at java.util.ArrayList.forEach(ArrayList.java:1249)
	at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
	at java.lang.Thread.run(Thread.java:745)

I am having a tough time figuring out why. Can someone help me out as
to where am I going wrong?

Re: Null pointer exception while trying to serialize a protobuf message

Posted by Ted Yu <yu...@gmail.com>.
I searched in Flink (and hbase) for GeneratedMessageV3 but didn't find any
reference.

Which version of protobuf did you use to generate the class ?

Please copy user@ in the future so that more people can help.

On Fri, Aug 4, 2017 at 8:27 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> public final class CustomerMessage extends GeneratedMessageV3 implements
> CustomerMessageOrBuilder {
>     private int bitField0_;
>     public static final int CUSTOMER_ID_FIELD_NUMBER = 1;
>     private volatile Object customerId_;
>     public static final int EVENT_TIMESTAMP_FIELD_NUMBER = 2;
>     private Timestamp eventTimestamp_;
>     public static final int NAME_FIELD_NUMBER = 4;
>     private volatile Object name_;
>     public static final int EMAIL_FIELD_NUMBER = 5;
>     private volatile Object email_;
>     public static final int PHONE_FIELD_NUMBER = 6;
>     private volatile Object phone_;
>     public static final int DEVICE_ID_LOGGED_IN_FROM_FIELD_NUMBER = 7;
>     private volatile Object deviceIdLoggedInFrom_;
>     public static final int REGISTERED_PHONES_FIELD_NUMBER = 8;
>     private LazyStringList registeredPhones_;
>     private byte memoizedIsInitialized;
>     private static final long serialVersionUID = 0L;
>     private static final CustomerMessage DEFAULT_INSTANCE = new
> CustomerMessage();
>     private static final Parser<CustomerMessage> PARSER = new
> AbstractParser() {
>         public CustomerMessage parsePartialFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
>             return new CustomerMessage(input, extensionRegistry, null);
>         }
>     };
>
>     private CustomerMessage(com.google.protobuf.GeneratedMessageV3.Builder<?>
> builder) {
>         super(builder);
>         this.memoizedIsInitialized = -1;
>     }
>
>     private CustomerMessage() {
>         this.memoizedIsInitialized = -1;
>         this.customerId_ = "";
>         this.name_ = "";
>         this.email_ = "";
>         this.phone_ = "";
>         this.deviceIdLoggedInFrom_ = "";
>         this.registeredPhones_ = LazyStringArrayList.EMPTY;
>     }
>
>     public final UnknownFieldSet getUnknownFields() {
>         return UnknownFieldSet.getDefaultInstance();
>     }
>
>     private CustomerMessage(CodedInputStream input, ExtensionRegistryLite
> extensionRegistry) throws InvalidProtocolBufferException {
>         this();
>         int mutable_bitField0_ = 0;
>
>         try {
>             boolean e = false;
>
>             while(!e) {
>                 int tag = input.readTag();
>                 String s;
>                 switch(tag) {
>                 case 0:
>                     e = true;
>                     break;
>                 case 10:
>                     s = input.readStringRequireUtf8();
>                     this.customerId_ = s;
>                     break;
>                 case 18:
>                     com.google.protobuf.Timestamp.Builder s1 = null;
>                     if(this.eventTimestamp_ != null) {
>                         s1 = this.eventTimestamp_.toBuilder();
>                     }
>
>                     this.eventTimestamp_ = (Timestamp)input.readMessage(Timestamp.parser(),
> extensionRegistry);
>                     if(s1 != null) {
>                         s1.mergeFrom(this.eventTimestamp_);
>                         this.eventTimestamp_ = s1.buildPartial();
>                     }
>                     break;
>                 case 34:
>                     s = input.readStringRequireUtf8();
>                     this.name_ = s;
>                     break;
>                 case 42:
>                     s = input.readStringRequireUtf8();
>                     this.email_ = s;
>                     break;
>                 case 50:
>                     s = input.readStringRequireUtf8();
>                     this.phone_ = s;
>                     break;
>                 case 58:
>                     s = input.readStringRequireUtf8();
>                     this.deviceIdLoggedInFrom_ = s;
>                     break;
>                 case 66:
>                     s = input.readStringRequireUtf8();
>                     if((mutable_bitField0_ & 64) != 64) {
>                         this.registeredPhones_ = new LazyStringArrayList();
>                         mutable_bitField0_ |= 64;
>                     }
>
>                     this.registeredPhones_.add(s);
>                     break;
>                 default:
>                     if(!input.skipField(tag)) {
>                         e = true;
>                     }
>                 }
>             }
>         } catch (InvalidProtocolBufferException var11) {
>             throw var11.setUnfinishedMessage(this);
>         } catch (IOException var12) {
>             throw (new InvalidProtocolBufferException
> (var12)).setUnfinishedMessage(this);
>         } finally {
>             if((mutable_bitField0_ & 64) == 64) {
>                 this.registeredPhones_ = this.registeredPhones_.
> getUnmodifiableView();
>             }
>
>             this.makeExtensionsImmutable();
>         }
>
>     }
>
>     public static final Descriptor getDescriptor() {
>         return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
>     }
>
>     protected FieldAccessorTable internalGetFieldAccessorTable() {
>         return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class,
> CustomerMessage.Builder.class);
>     }
>
>     public String getCustomerId() {
>         Object ref = this.customerId_;
>         if(ref instanceof String) {
>             return (String)ref;
>         } else {
>             ByteString bs = (ByteString)ref;
>             String s = bs.toStringUtf8();
>             this.customerId_ = s;
>             return s;
>         }
>     }
>
>     public ByteString getCustomerIdBytes() {
>         Object ref = this.customerId_;
>         if(ref instanceof String) {
>             ByteString b = ByteString.copyFromUtf8((String)ref);
>             this.customerId_ = b;
>             return b;
>         } else {
>             return (ByteString)ref;
>         }
>     }
>
>     public boolean hasEventTimestamp() {
>         return this.eventTimestamp_ != null;
>     }
>
>     public Timestamp getEventTimestamp() {
>         return this.eventTimestamp_ == null?Timestamp.
> getDefaultInstance():this.eventTimestamp_;
>     }
>
>     public TimestampOrBuilder getEventTimestampOrBuilder() {
>         return this.getEventTimestamp();
>     }
>
>     public String getName() {
>         Object ref = this.name_;
>         if(ref instanceof String) {
>             return (String)ref;
>         } else {
>             ByteString bs = (ByteString)ref;
>             String s = bs.toStringUtf8();
>             this.name_ = s;
>             return s;
>         }
>     }
>
>     public ByteString getNameBytes() {
>         Object ref = this.name_;
>         if(ref instanceof String) {
>             ByteString b = ByteString.copyFromUtf8((String)ref);
>             this.name_ = b;
>             return b;
>         } else {
>             return (ByteString)ref;
>         }
>     }
>
>     public String getEmail() {
>         Object ref = this.email_;
>         if(ref instanceof String) {
>             return (String)ref;
>         } else {
>             ByteString bs = (ByteString)ref;
>             String s = bs.toStringUtf8();
>             this.email_ = s;
>             return s;
>         }
>     }
>
>     public ByteString getEmailBytes() {
>         Object ref = this.email_;
>         if(ref instanceof String) {
>             ByteString b = ByteString.copyFromUtf8((String)ref);
>             this.email_ = b;
>             return b;
>         } else {
>             return (ByteString)ref;
>         }
>     }
>
>     public String getPhone() {
>         Object ref = this.phone_;
>         if(ref instanceof String) {
>             return (String)ref;
>         } else {
>             ByteString bs = (ByteString)ref;
>             String s = bs.toStringUtf8();
>             this.phone_ = s;
>             return s;
>         }
>     }
>
>     public ByteString getPhoneBytes() {
>         Object ref = this.phone_;
>         if(ref instanceof String) {
>             ByteString b = ByteString.copyFromUtf8((String)ref);
>             this.phone_ = b;
>             return b;
>         } else {
>             return (ByteString)ref;
>         }
>     }
>
>     public String getDeviceIdLoggedInFrom() {
>         Object ref = this.deviceIdLoggedInFrom_;
>         if(ref instanceof String) {
>             return (String)ref;
>         } else {
>             ByteString bs = (ByteString)ref;
>             String s = bs.toStringUtf8();
>             this.deviceIdLoggedInFrom_ = s;
>             return s;
>         }
>     }
>
>     public ByteString getDeviceIdLoggedInFromBytes() {
>         Object ref = this.deviceIdLoggedInFrom_;
>         if(ref instanceof String) {
>             ByteString b = ByteString.copyFromUtf8((String)ref);
>             this.deviceIdLoggedInFrom_ = b;
>             return b;
>         } else {
>             return (ByteString)ref;
>         }
>     }
>
>     public ProtocolStringList getRegisteredDevicesList() {
>         return this.registeredPhones_;
>     }
>
>     public int getRegisteredDevicesCount() {
>         return this.registeredPhones_.size();
>     }
>
>     public String getRegisteredDevices(int index) {
>         return (String)this.registeredPhones_.get(index);
>     }
>
>     public ByteString getRegisteredDevicesBytes(int index) {
>         return this.registeredPhones_.getByteString(index);
>     }
>
>     public final boolean isInitialized() {
>         byte isInitialized = this.memoizedIsInitialized;
>         if(isInitialized == 1) {
>             return true;
>         } else if(isInitialized == 0) {
>             return false;
>         } else {
>             this.memoizedIsInitialized = 1;
>             return true;
>         }
>     }
>
>     public void writeTo(CodedOutputStream output) throws IOException {
>         if(!this.getCustomerIdBytes().isEmpty()) {
>             GeneratedMessageV3.writeString(output, 1, this.customerId_);
>         }
>
>         if(this.eventTimestamp_ != null) {
>             output.writeMessage(2, this.getEventTimestamp());
>         }
>
>         if(!this.getNameBytes().isEmpty()) {
>             GeneratedMessageV3.writeString(output, 4, this.name_);
>         }
>
>         if(!this.getEmailBytes().isEmpty()) {
>             GeneratedMessageV3.writeString(output, 5, this.email_);
>         }
>
>         if(!this.getPhoneBytes().isEmpty()) {
>             GeneratedMessageV3.writeString(output, 6, this.phone_);
>         }
>
>         if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
>             GeneratedMessageV3.writeString(output, 7,
> this.deviceIdLoggedInFrom_);
>         }
>
>         for(int i = 0; i < this.registeredPhones_.size(); ++i) {
>             GeneratedMessageV3.writeString(output, 8,
> this.registeredPhones_.getRaw(i));
>         }
>
>     }
>
>     public int getSerializedSize() {
>         int size = this.memoizedSize;
>         if(size != -1) {
>             return size;
>         } else {
>             size = 0;
>             if(!this.getCustomerIdBytes().isEmpty()) {
>                 size += GeneratedMessageV3.computeStringSize(1,
> this.customerId_);
>             }
>
>             if(this.eventTimestamp_ != null) {
>                 size += CodedOutputStream.computeMessageSize(2,
> this.getEventTimestamp());
>             }
>
>             if(!this.getNameBytes().isEmpty()) {
>                 size += GeneratedMessageV3.computeStringSize(4,
> this.name_);
>             }
>
>             if(!this.getEmailBytes().isEmpty()) {
>                 size += GeneratedMessageV3.computeStringSize(5,
> this.email_);
>             }
>
>             if(!this.getPhoneBytes().isEmpty()) {
>                 size += GeneratedMessageV3.computeStringSize(6,
> this.phone_);
>             }
>
>             if(!this.getDeviceIdLoggedInFromBytes().isEmpty()) {
>                 size += GeneratedMessageV3.computeStringSize(7,
> this.deviceIdLoggedInFrom_);
>             }
>
>             int dataSize = 0;
>
>             for(int i = 0; i < this.registeredPhones_.size(); ++i) {
>                 dataSize += computeStringSizeNoTag(this.
> registeredPhones_.getRaw(i));
>             }
>
>             size += dataSize;
>             size += 1 * this.getRegisteredDevicesList().size();
>             this.memoizedSize = size;
>             return size;
>         }
>     }
>
>     public boolean equals(Object obj) {
>         if(obj == this) {
>             return true;
>         } else if(!(obj instanceof CustomerMessage)) {
>             return super.equals(obj);
>         } else {
>             CustomerMessage other = (CustomerMessage)obj;
>             boolean result = true;
>             result = result && this.getCustomerId().equals(
> other.getCustomerId());
>             result = result && this.hasEventTimestamp() ==
> other.hasEventTimestamp();
>             if(this.hasEventTimestamp()) {
>                 result = result && this.getEventTimestamp().equals(other.
> getEventTimestamp());
>             }
>
>             result = result && this.getName().equals(other.getName());
>             result = result && this.getEmail().equals(other.getEmail());
>             result = result && this.getPhone().equals(other.getPhone());
>             result = result && this.getDeviceIdLoggedInFrom()
> .equals(other.getDeviceIdLoggedInFrom());
>             result = result && this.getRegisteredDevicesList(
> ).equals(other.getRegisteredDevicesList());
>             return result;
>         }
>     }
>
>     public int hashCode() {
>         if(this.memoizedHashCode != 0) {
>             return this.memoizedHashCode;
>         } else {
>             byte hash = 41;
>             int hash1 = 19 * hash + this.getDescriptorForType().
> hashCode();
>             hash1 = 37 * hash1 + 1;
>             hash1 = 53 * hash1 + this.getCustomerId().hashCode();
>             if(this.hasEventTimestamp()) {
>                 hash1 = 37 * hash1 + 2;
>                 hash1 = 53 * hash1 + this.getEventTimestamp().hashCode();
>             }
>
>             hash1 = 37 * hash1 + 4;
>             hash1 = 53 * hash1 + this.getName().hashCode();
>             hash1 = 37 * hash1 + 5;
>             hash1 = 53 * hash1 + this.getEmail().hashCode();
>             hash1 = 37 * hash1 + 6;
>             hash1 = 53 * hash1 + this.getPhone().hashCode();
>             hash1 = 37 * hash1 + 7;
>             hash1 = 53 * hash1 + this.getDeviceIdLoggedInFrom()
> .hashCode();
>             if(this.getRegisteredDevicesCount() > 0) {
>                 hash1 = 37 * hash1 + 8;
>                 hash1 = 53 * hash1 + this.getRegisteredDevicesList(
> ).hashCode();
>             }
>
>             hash1 = 29 * hash1 + this.unknownFields.hashCode();
>             this.memoizedHashCode = hash1;
>             return hash1;
>         }
>     }
>
>     public static CustomerMessage parseFrom(ByteString data) throws
> InvalidProtocolBufferException {
>         return (CustomerMessage)PARSER.parseFrom(data);
>     }
>
>     public static CustomerMessage parseFrom(ByteString data,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
>         return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
>     }
>
>     public static CustomerMessage parseFrom(byte[] data) throws
> InvalidProtocolBufferException {
>         return (CustomerMessage)PARSER.parseFrom(data);
>     }
>
>     public static CustomerMessage parseFrom(byte[] data,
> ExtensionRegistryLite extensionRegistry) throws
> InvalidProtocolBufferException {
>         return (CustomerMessage)PARSER.parseFrom(data, extensionRegistry);
>     }
>
>     public static CustomerMessage parseFrom(InputStream input) throws
> IOException {
>         return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input);
>     }
>
>     public static CustomerMessage parseFrom(InputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
>         return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input, extensionRegistry);
>     }
>
>     public static CustomerMessage parseDelimitedFrom(InputStream input)
> throws IOException {
>         return (CustomerMessage)GeneratedMessageV3.
> parseDelimitedWithIOException(PARSER, input);
>     }
>
>     public static CustomerMessage parseDelimitedFrom(InputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
>         return (CustomerMessage)GeneratedMessageV3.
> parseDelimitedWithIOException(PARSER, input, extensionRegistry);
>     }
>
>     public static CustomerMessage parseFrom(CodedInputStream input) throws
> IOException {
>         return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input);
>     }
>
>     public static CustomerMessage parseFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
>         return (CustomerMessage)GeneratedMessageV3.parseWithIOException(PARSER,
> input, extensionRegistry);
>     }
>
>     public CustomerMessage.Builder newBuilderForType() {
>         return newBuilder();
>     }
>
>     public static CustomerMessage.Builder newBuilder() {
>         return DEFAULT_INSTANCE.toBuilder();
>     }
>
>     public static CustomerMessage.Builder newBuilder(CustomerMessage
> prototype) {
>         return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
>     }
>
>     public CustomerMessage.Builder toBuilder() {
>         return this == DEFAULT_INSTANCE?new CustomerMessage.Builder(null):(new
> CustomerMessage.Builder(null)).mergeFrom(this);
>     }
>
>     protected CustomerMessage.Builder newBuilderForType(BuilderParent
> parent) {
>         CustomerMessage.Builder builder = new CustomerMessage.Builder(parent,
> null);
>         return builder;
>     }
>
>     public static CustomerMessage getDefaultInstance() {
>         return DEFAULT_INSTANCE;
>     }
>
>     public static Parser<CustomerMessage> parser() {
>         return PARSER;
>     }
>
>     public Parser<CustomerMessage> getParserForType() {
>         return PARSER;
>     }
>
>     public CustomerMessage getDefaultInstanceForType() {
>         return DEFAULT_INSTANCE;
>     }
>
>     public static final class Builder extends com.google.protobuf.
> GeneratedMessageV3.Builder<CustomerMessage.Builder> implements
> CustomerMessageOrBuilder {
>         private int bitField0_;
>         private Object customerId_;
>         private Timestamp eventTimestamp_;
>         private SingleFieldBuilderV3<Timestamp,
> com.google.protobuf.Timestamp.Builder, TimestampOrBuilder>
> eventTimestampBuilder_;
>         private Object name_;
>         private Object email_;
>         private Object phone_;
>         private Object deviceIdLoggedInFrom_;
>         private LazyStringList registeredPhones_;
>
>         public static final Descriptor getDescriptor() {
>             return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
>         }
>
>         protected FieldAccessorTable internalGetFieldAccessorTable() {
>             return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_fieldAccessorTable.ensureFieldAccessorsInitialized(CustomerMessage.class,
> CustomerMessage.Builder.class);
>         }
>
>         private Builder() {
>             this.customerId_ = "";
>             this.eventTimestamp_ = null;
>             this.name_ = "";
>             this.email_ = "";
>             this.phone_ = "";
>             this.deviceIdLoggedInFrom_ = "";
>             this.registeredPhones_ = LazyStringArrayList.EMPTY;
>             this.maybeForceBuilderInitialization();
>         }
>
>         private Builder(BuilderParent parent) {
>             super(parent);
>             this.customerId_ = "";
>             this.eventTimestamp_ = null;
>             this.name_ = "";
>             this.email_ = "";
>             this.phone_ = "";
>             this.deviceIdLoggedInFrom_ = "";
>             this.registeredPhones_ = LazyStringArrayList.EMPTY;
>             this.maybeForceBuilderInitialization();
>         }
>
>         private void maybeForceBuilderInitialization() {
>             if(CustomerMessage.alwaysUseFieldBuilders) {
>                 ;
>             }
>
>         }
>
>         public CustomerMessage.Builder clear() {
>             super.clear();
>             this.customerId_ = "";
>             if(this.eventTimestampBuilder_ == null) {
>                 this.eventTimestamp_ = null;
>             } else {
>                 this.eventTimestamp_ = null;
>                 this.eventTimestampBuilder_ = null;
>             }
>
>             this.name_ = "";
>             this.email_ = "";
>             this.phone_ = "";
>             this.deviceIdLoggedInFrom_ = "";
>             this.registeredPhones_ = LazyStringArrayList.EMPTY;
>             this.bitField0_ &= -65;
>             return this;
>         }
>
>         public Descriptor getDescriptorForType() {
>             return CustomerLoginProto.internal_static_gojek_esb_customer_
> CustomerMessage_descriptor;
>         }
>
>         public CustomerMessage getDefaultInstanceForType() {
>             return CustomerMessage.DEFAULT_INSTANCE;
>         }
>
>         public CustomerMessage build() {
>             CustomerMessage result = this.buildPartial();
>             if(!result.isInitialized()) {
>                 throw newUninitializedMessageException(result);
>             } else {
>                 return result;
>             }
>         }
>
>         public CustomerMessage buildPartial() {
>             CustomerMessage result = new CustomerMessage(this, null);
>             int from_bitField0_ = this.bitField0_;
>             byte to_bitField0_ = 0;
>             result.customerId_ = this.customerId_;
>             if(this.eventTimestampBuilder_ == null) {
>                 result.eventTimestamp_ = this.eventTimestamp_;
>             } else {
>                 result.eventTimestamp_ = (Timestamp)this.
> eventTimestampBuilder_.build();
>             }
>
>             result.name_ = this.name_;
>             result.email_ = this.email_;
>             result.phone_ = this.phone_;
>             result.deviceIdLoggedInFrom_ = this.deviceIdLoggedInFrom_;
>             if((this.bitField0_ & 64) == 64) {
>                 this.registeredPhones_ = this.registeredPhones_.
> getUnmodifiableView();
>                 this.bitField0_ &= -65;
>             }
>
>             result.registeredPhones_ = this.registeredPhones_;
>             result.bitField0_ = to_bitField0_;
>             this.onBuilt();
>             return result;
>         }
>
>         public CustomerMessage.Builder clone() {
>             return (CustomerMessage.Builder)super.clone();
>         }
>
>         public CustomerMessage.Builder setField(FieldDescriptor field,
> Object value) {
>             return (CustomerMessage.Builder)super.setField(field, value);
>         }
>
>         public CustomerMessage.Builder clearField(FieldDescriptor field) {
>             return (CustomerMessage.Builder)super.clearField(field);
>         }
>
>         public CustomerMessage.Builder clearOneof(OneofDescriptor oneof) {
>             return (CustomerMessage.Builder)super.clearOneof(oneof);
>         }
>
>         public CustomerMessage.Builder setRepeatedField(FieldDescriptor
> field, int index, Object value) {
>             return (CustomerMessage.Builder)super.setRepeatedField(field,
> index, value);
>         }
>
>         public CustomerMessage.Builder addRepeatedField(FieldDescriptor
> field, Object value) {
>             return (CustomerMessage.Builder)super.addRepeatedField(field,
> value);
>         }
>
>         public CustomerMessage.Builder mergeFrom(Message other) {
>             if(other instanceof CustomerMessage) {
>                 return this.mergeFrom((CustomerMessage)other);
>             } else {
>                 super.mergeFrom(other);
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder mergeFrom(CustomerMessage other) {
>             if(other == CustomerMessage.DEFAULT_INSTANCE) {
>                 return this;
>             } else {
>                 if(!other.getCustomerId().isEmpty()) {
>                     this.customerId_ = other.customerId_;
>                     this.onChanged();
>                 }
>
>                 if(other.hasEventTimestamp()) {
>                     this.mergeEventTimestamp(other.getEventTimestamp());
>                 }
>
>                 if(!other.getName().isEmpty()) {
>                     this.name_ = other.name_;
>                     this.onChanged();
>                 }
>
>                 if(!other.getEmail().isEmpty()) {
>                     this.email_ = other.email_;
>                     this.onChanged();
>                 }
>
>                 if(!other.getPhone().isEmpty()) {
>                     this.phone_ = other.phone_;
>                     this.onChanged();
>                 }
>
>                 if(!other.getDeviceIdLoggedInFrom().isEmpty()) {
>                     this.deviceIdLoggedInFrom_ =
> other.deviceIdLoggedInFrom_;
>                     this.onChanged();
>                 }
>
>                 if(!other.registeredPhones_.isEmpty()) {
>                     if(this.registeredPhones_.isEmpty()) {
>                         this.registeredPhones_ = other.registeredPhones_;
>                         this.bitField0_ &= -65;
>                     } else {
>                         this.ensureRegisteredDevicesIsMutable();
>                         this.registeredPhones_.addAll(
> other.registeredPhones_);
>                     }
>
>                     this.onChanged();
>                 }
>
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public final boolean isInitialized() {
>             return true;
>         }
>
>         public CustomerMessage.Builder mergeFrom(CodedInputStream input,
> ExtensionRegistryLite extensionRegistry) throws IOException {
>             CustomerMessage parsedMessage = null;
>
>             try {
>                 parsedMessage = (CustomerMessage)CustomerMessage.PARSER.parsePartialFrom(input,
> extensionRegistry);
>             } catch (InvalidProtocolBufferException var8) {
>                 parsedMessage = (CustomerMessage)var8.
> getUnfinishedMessage();
>                 throw var8.unwrapIOException();
>             } finally {
>                 if(parsedMessage != null) {
>                     this.mergeFrom(parsedMessage);
>                 }
>
>             }
>
>             return this;
>         }
>
>         public String getCustomerId() {
>             Object ref = this.customerId_;
>             if(!(ref instanceof String)) {
>                 ByteString bs = (ByteString)ref;
>                 String s = bs.toStringUtf8();
>                 this.customerId_ = s;
>                 return s;
>             } else {
>                 return (String)ref;
>             }
>         }
>
>         public ByteString getCustomerIdBytes() {
>             Object ref = this.customerId_;
>             if(ref instanceof String) {
>                 ByteString b = ByteString.copyFromUtf8((String)ref);
>                 this.customerId_ = b;
>                 return b;
>             } else {
>                 return (ByteString)ref;
>             }
>         }
>
>         public CustomerMessage.Builder setCustomerId(String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.customerId_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder clearCustomerId() {
>             this.customerId_ = CustomerMessage.DEFAULT_
> INSTANCE.getCustomerId();
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder setCustomerIdBytes(ByteString
> value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.customerId_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public boolean hasEventTimestamp() {
>             return this.eventTimestampBuilder_ != null ||
> this.eventTimestamp_ != null;
>         }
>
>         public Timestamp getEventTimestamp() {
>             return this.eventTimestampBuilder_ ==
> null?(this.eventTimestamp_ == null?Timestamp.getDefaultInstance():this.
> eventTimestamp_):(Timestamp)this.eventTimestampBuilder_.getMessage();
>         }
>
>         public CustomerMessage.Builder setEventTimestamp(Timestamp value) {
>             if(this.eventTimestampBuilder_ == null) {
>                 if(value == null) {
>                     throw new NullPointerException();
>                 }
>
>                 this.eventTimestamp_ = value;
>                 this.onChanged();
>             } else {
>                 this.eventTimestampBuilder_.setMessage(value);
>             }
>
>             return this;
>         }
>
>         public CustomerMessage.Builder setEventTimestamp(com.google.protobuf.Timestamp.Builder
> builderForValue) {
>             if(this.eventTimestampBuilder_ == null) {
>                 this.eventTimestamp_ = builderForValue.build();
>                 this.onChanged();
>             } else {
>                 this.eventTimestampBuilder_.setMessage(builderForValue.
> build());
>             }
>
>             return this;
>         }
>
>         public CustomerMessage.Builder mergeEventTimestamp(Timestamp
> value) {
>             if(this.eventTimestampBuilder_ == null) {
>                 if(this.eventTimestamp_ != null) {
>                     this.eventTimestamp_ = Timestamp.newBuilder(this.
> eventTimestamp_).mergeFrom(value).buildPartial();
>                 } else {
>                     this.eventTimestamp_ = value;
>                 }
>
>                 this.onChanged();
>             } else {
>                 this.eventTimestampBuilder_.mergeFrom(value);
>             }
>
>             return this;
>         }
>
>         public CustomerMessage.Builder clearEventTimestamp() {
>             if(this.eventTimestampBuilder_ == null) {
>                 this.eventTimestamp_ = null;
>                 this.onChanged();
>             } else {
>                 this.eventTimestamp_ = null;
>                 this.eventTimestampBuilder_ = null;
>             }
>
>             return this;
>         }
>
>         public com.google.protobuf.Timestamp.Builder
> getEventTimestampBuilder() {
>             this.onChanged();
>             return (com.google.protobuf.Timestamp.Builder)this.
> getEventTimestampFieldBuilder().getBuilder();
>         }
>
>         public TimestampOrBuilder getEventTimestampOrBuilder() {
>             return (TimestampOrBuilder)(this.eventTimestampBuilder_ !=
> null?(TimestampOrBuilder)this.eventTimestampBuilder_.
> getMessageOrBuilder():(this.eventTimestamp_ == null?Timestamp.
> getDefaultInstance():this.eventTimestamp_));
>         }
>
>         private SingleFieldBuilderV3<Timestamp,
> com.google.protobuf.Timestamp.Builder, TimestampOrBuilder>
> getEventTimestampFieldBuilder() {
>             if(this.eventTimestampBuilder_ == null) {
>                 this.eventTimestampBuilder_ = new
> SingleFieldBuilderV3(this.getEventTimestamp(),
> this.getParentForChildren(), this.isClean());
>                 this.eventTimestamp_ = null;
>             }
>
>             return this.eventTimestampBuilder_;
>         }
>
>         public String getName() {
>             Object ref = this.name_;
>             if(!(ref instanceof String)) {
>                 ByteString bs = (ByteString)ref;
>                 String s = bs.toStringUtf8();
>                 this.name_ = s;
>                 return s;
>             } else {
>                 return (String)ref;
>             }
>         }
>
>         public ByteString getNameBytes() {
>             Object ref = this.name_;
>             if(ref instanceof String) {
>                 ByteString b = ByteString.copyFromUtf8((String)ref);
>                 this.name_ = b;
>                 return b;
>             } else {
>                 return (ByteString)ref;
>             }
>         }
>
>         public CustomerMessage.Builder setName(String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.name_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder clearName() {
>             this.name_ = CustomerMessage.DEFAULT_INSTANCE.getName();
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder setNameBytes(ByteString value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.name_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public String getEmail() {
>             Object ref = this.email_;
>             if(!(ref instanceof String)) {
>                 ByteString bs = (ByteString)ref;
>                 String s = bs.toStringUtf8();
>                 this.email_ = s;
>                 return s;
>             } else {
>                 return (String)ref;
>             }
>         }
>
>         public ByteString getEmailBytes() {
>             Object ref = this.email_;
>             if(ref instanceof String) {
>                 ByteString b = ByteString.copyFromUtf8((String)ref);
>                 this.email_ = b;
>                 return b;
>             } else {
>                 return (ByteString)ref;
>             }
>         }
>
>         public CustomerMessage.Builder setEmail(String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.email_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder clearEmail() {
>             this.email_ = CustomerMessage.DEFAULT_INSTANCE.getEmail();
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder setEmailBytes(ByteString value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.email_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public String getPhone() {
>             Object ref = this.phone_;
>             if(!(ref instanceof String)) {
>                 ByteString bs = (ByteString)ref;
>                 String s = bs.toStringUtf8();
>                 this.phone_ = s;
>                 return s;
>             } else {
>                 return (String)ref;
>             }
>         }
>
>         public ByteString getPhoneBytes() {
>             Object ref = this.phone_;
>             if(ref instanceof String) {
>                 ByteString b = ByteString.copyFromUtf8((String)ref);
>                 this.phone_ = b;
>                 return b;
>             } else {
>                 return (ByteString)ref;
>             }
>         }
>
>         public CustomerMessage.Builder setPhone(String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.phone_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder clearPhone() {
>             this.phone_ = CustomerMessage.DEFAULT_INSTANCE.getPhone();
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder setPhoneBytes(ByteString value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.phone_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public String getDeviceIdLoggedInFrom() {
>             Object ref = this.deviceIdLoggedInFrom_;
>             if(!(ref instanceof String)) {
>                 ByteString bs = (ByteString)ref;
>                 String s = bs.toStringUtf8();
>                 this.deviceIdLoggedInFrom_ = s;
>                 return s;
>             } else {
>                 return (String)ref;
>             }
>         }
>
>         public ByteString getDeviceIdLoggedInFromBytes() {
>             Object ref = this.deviceIdLoggedInFrom_;
>             if(ref instanceof String) {
>                 ByteString b = ByteString.copyFromUtf8((String)ref);
>                 this.deviceIdLoggedInFrom_ = b;
>                 return b;
>             } else {
>                 return (ByteString)ref;
>             }
>         }
>
>         public CustomerMessage.Builder setDeviceIdLoggedInFrom(String
> value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.deviceIdLoggedInFrom_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder clearDeviceIdLoggedInFrom() {
>             this.deviceIdLoggedInFrom_ = CustomerMessage.DEFAULT_INSTANCE.
> getDeviceIdLoggedInFrom();
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder setDeviceIdLoggedInFromBytes(ByteString
> value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.deviceIdLoggedInFrom_ = value;
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         private void ensureRegisteredDevicesIsMutable() {
>             if((this.bitField0_ & 64) != 64) {
>                 this.registeredPhones_ = new LazyStringArrayList(this.
> registeredPhones_);
>                 this.bitField0_ |= 64;
>             }
>
>         }
>
>         public ProtocolStringList getRegisteredDevicesList() {
>             return this.registeredPhones_.getUnmodifiableView();
>         }
>
>         public int getRegisteredDevicesCount() {
>             return this.registeredPhones_.size();
>         }
>
>         public String getRegisteredDevices(int index) {
>             return (String)this.registeredPhones_.get(index);
>         }
>
>         public ByteString getRegisteredDevicesBytes(int index) {
>             return this.registeredPhones_.getByteString(index);
>         }
>
>         public CustomerMessage.Builder setRegisteredDevices(int index,
> String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.ensureRegisteredDevicesIsMutable();
>                 this.registeredPhones_.set(index, value);
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder addRegisteredDevices(String value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 this.ensureRegisteredDevicesIsMutable();
>                 this.registeredPhones_.add(value);
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public CustomerMessage.Builder addAllRegisteredDevices(Iterable<String>
> values) {
>             this.ensureRegisteredDevicesIsMutable();
>             com.google.protobuf.AbstractMessageLite.Builder.addAll(values,
> this.registeredPhones_);
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder clearRegisteredDevices() {
>             this.registeredPhones_ = LazyStringArrayList.EMPTY;
>             this.bitField0_ &= -65;
>             this.onChanged();
>             return this;
>         }
>
>         public CustomerMessage.Builder addRegisteredDevicesBytes(ByteString
> value) {
>             if(value == null) {
>                 throw new NullPointerException();
>             } else {
>                 CustomerMessage.checkByteStringIsUtf8(value);
>                 this.ensureRegisteredDevicesIsMutable();
>                 this.registeredPhones_.add(value);
>                 this.onChanged();
>                 return this;
>             }
>         }
>
>         public final CustomerMessage.Builder setUnknownFields(UnknownFieldSet
> unknownFields) {
>             return this;
>         }
>
>         public final CustomerMessage.Builder mergeUnknownFields(UnknownFieldSet
> unknownFields) {
>             return this;
>         }
>     }
> }
>
>
>
> On Fri, Aug 4, 2017 at 8:07 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Can you show how CustomerMessage is defined ?
>>
>> Thanks
>>
>> On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <fl...@gmail.com>
>> wrote:
>>
>>> Folks,
>>>
>>> I wrote a custom Data source to test me CEP logic. The custom data
>>> source looks like :
>>>
>>> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
>>>     private boolean running = true;
>>>     private final Random random;
>>>
>>>     public CustomerDataSource() {
>>>         this.random = new Random();
>>>     }
>>>
>>>     @Override
>>>     public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>>>         while (running) {
>>>             new CustomerDataGen().generateMessages().
>>>                     forEach(element -> ctx.collect(element));
>>>
>>>             Thread.sleep(10000);
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public void cancel() {
>>>         running = false;
>>>     }
>>> }
>>>
>>> public class CustomerDataGen {
>>>
>>>     public CustomerDataGen() {
>>>         this.random = new Random();
>>>     }
>>>     @Override
>>>     public List<CustomerMessage> generateMessages() throws InterruptedException {
>>>         List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>>>
>>>         messages.add(getMessage());
>>>         return messages;
>>>     }
>>>
>>>     private CustomerMessage getMessage() {
>>>         Instant time = Instant.now();
>>>         Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>>>         Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
>>>         return CustomerMessage.newBuilder().
>>>                 setName("SomeCustomer").
>>>                 setEventTimestamp(eventTimeStamp).
>>>                 setCustomerId("01234").
>>>                 addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>>>                 setEmail("customer@foo.com").
>>>                 build();
>>>     }
>>> }
>>>
>>> In my Main program :
>>>
>>> .........
>>> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
>>> env.addSource(new CustomerDataSource());
>>>
>>> env.execute();
>>>
>>>
>>> When I run the program, I get the following exception :
>>>
>>>
>>> Caused by: java.lang.NullPointerException
>>> 	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
>>> 	at java.util.AbstractList.add(AbstractList.java:108)
>>> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
>>> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
>>> 	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>>> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
>>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>>> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
>>> 	at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
>>> 	at java.util.ArrayList.forEach(ArrayList.java:1249)
>>> 	at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>>> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>>> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
>>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> 	at java.lang.Thread.run(Thread.java:745)
>>>
>>> I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?
>>>
>>>
>>>
>>
>

Re: Null pointer exception while trying to serialize a protobuf message

Posted by Ted Yu <yu...@gmail.com>.
Can you show how CustomerMessage is defined ?

Thanks

On Fri, Aug 4, 2017 at 7:22 AM, Sridhar Chellappa <fl...@gmail.com>
wrote:

> Folks,
>
> I wrote a custom Data source to test me CEP logic. The custom data source
> looks like :
>
> public class CustomerDataSource extends RichParallelSourceFunction<Customer> {
>     private boolean running = true;
>     private final Random random;
>
>     public CustomerDataSource() {
>         this.random = new Random();
>     }
>
>     @Override
>     public void run(SourceContext<CustomerMessage> ctx) throws Exception {
>         while (running) {
>             new CustomerDataGen().generateMessages().
>                     forEach(element -> ctx.collect(element));
>
>             Thread.sleep(10000);
>         }
>     }
>
>     @Override
>     public void cancel() {
>         running = false;
>     }
> }
>
> public class CustomerDataGen {
>
>     public CustomerDataGen() {
>         this.random = new Random();
>     }
>     @Override
>     public List<CustomerMessage> generateMessages() throws InterruptedException {
>         List<CustomerMessage> messages = new ArrayList<CustomerMessage>();
>
>         messages.add(getMessage());
>         return messages;
>     }
>
>     private CustomerMessage getMessage() {
>         Instant time = Instant.now();
>         Timestamp eventTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(0).build();
>         Timestamp creationTimeStamp = Timestamp.newBuilder().setSeconds(time.getEpochSecond() -1).setNanos(0).build();
>         return CustomerMessage.newBuilder().
>                 setName("SomeCustomer").
>                 setEventTimestamp(eventTimeStamp).
>                 setCustomerId("01234").
>                 addAllRegisteredPhoneNumbers(Arrays.asList("+9185216741972")).
>                 setEmail("customer@foo.com").
>                 build();
>     }
> }
>
> In my Main program :
>
> .........
> env.getConfig().registerTypeWithKryoSerializer(CustomerMessage.class, ProtobufSerializer.class);
> env.addSource(new CustomerDataSource());
>
> env.execute();
>
>
> When I run the program, I get the following exception :
>
>
> Caused by: java.lang.NullPointerException
> 	at shaded.com.google.protobuf.UnmodifiableLazyStringList.size(UnmodifiableLazyStringList.java:68)
> 	at java.util.AbstractList.add(AbstractList.java:108)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
> 	at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
> 	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:236)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
> 	at com.sample.datasources.custom.CustomertDataSource.lambda$run$0(CustomerDataSource.java:24)
> 	at java.util.ArrayList.forEach(ArrayList.java:1249)
> 	at com.sample.datasources.custom.CustomertDataSource.run(CustomerDataSource.java:24)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
>
> I am having a tough time figuring out why. Can someone help me out as to where am I going wrong?
>
>
>