You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ahmed Nader <ah...@gmail.com> on 2016/06/08 13:40:14 UTC

com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Hello,
I have a TwitterSource and I'm applying some transformations as filter and
map on the resulting stream from twitter. I'm collecting the output in an
iterator: iterator = DataStreamUtils.collect(datastream). Then in a
parallel thread i periodically check if this iterator.hasNext() and print
the next item. I'm using Flink 1.0.3.
That program works at the beginning and actually prints some items, however
when i leave it running for some more time (Like for example after 40
seconds or 1 minute) then i get 2 exceptions which are:
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID
and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
These 2 exceptions result from the line where i'm checking if the iterator
hasNext().

I wanted to know why do these exceptions happen in general and also if
anyone knows a specific solution for my program, that would be great too.
Thanks,
Ahmed

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Posted by Ahmed Nader <ah...@gmail.com>.
Hello Till,
Thanks  so much for your reply. Here's my program:
So that's TwitterSource:

public class TwitterSource extends Stream {
    private static final long serialVersionUID = 1L;
    protected transient BlockingQueue<String> queue;
    protected int queueSize = 10000;
    private transient BasicClient client;
    private int waitSec = 5;

    private int maxNumberOfTweets;
    private int currentNumberOfTweets;

    private volatile boolean isRunning = true;

    public TwitterSource(int numberOfTweets) {
        this.maxNumberOfTweets = numberOfTweets;
       currentNumberOfTweets = 0;
    }

    public void initializeConnection() {
        queue = new LinkedBlockingQueue<>(queueSize);

        UserstreamEndpoint endpoint = new UserstreamEndpoint ();
        endpoint.stallWarnings(false);

        Authentication auth = authenticate();

        initializeClient(endpoint, auth);

    }

    public OAuth1 authenticate() {
        return new OAuth1("---");
    }

    protected void initializeClient(DefaultStreamingEndpoint endpoint,
Authentication auth) {
        client = new
ClientBuilder().name("twitterSourceClient").hosts(Constants.USERSTREAM_HOST)
                .endpoint(endpoint).authentication(auth)
                .processor(new StringDelimitedProcessor(queue)).build();

        client.connect();
    }

    @Override
    public void run(SourceContext<Object> sourceContext) throws Exception {
        initializeConnection();
        while (isRunning) {
            sourceContext.collect(queue.take());
            currentNumberOfTweets++;
            if (maxNumberOfTweets != -1 && currentNumberOfTweets >=
maxNumberOfTweets) {
                break;
            }
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

then i initialize it:

List<Object> globalEntities = new ArrayList<>();
Iterator iterator;

public void runModel(String key) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment(); DataStream twitter = env.addSource(new
TwitterSource(30)).flatMap(new processTweets()); twitter.filter(new
FilterFunction<Object>()
{

    @Override
    public boolean filter(Object tweet) throws Exception {
        Tweet Singletweet = (Tweet) tweet;
        return Singletweet.search(key);
    }
    }).print();

iterator = DataStreamUtils.collect(datastream); } //this method is called
periodically with an Ajax call every 2 seconds

public void viewResults(Model model) {

if (iterator != null) {
    if (iterator.hasNext()) {
        globalEntities.add(iter.next());
    }
}
if (!globalEntities.isEmpty()) {
    model.addAttribute("list", globalEntities);
}

}

public static class processTweets extends JSONParseFlatMap<Object, Object> {
    @Override
    public void flatMap(Object value, Collector<Object> out) throws Exception {
        try {
            //if (getString((String)value, "user.lang").equals("en")) {
                Tweet tweet = new Tweet();
                // message of tweet
                tweet.setText(getString((String) value, "text"));
                tweet.setUser(getString((String) value, "user.name"));
                out.collect(tweet);
           // }
        } catch (JSONException e) {
            // the JSON was not parsed correctly
        }
    }
}

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Tweet {
    private String user;
    private String text;

    public Tweet() {

    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String toString() {
        return this.user+" : "+this.text;
    }

    public boolean search(String key) {
        String patternString = ".*"+key+".*";
        Pattern pattern = Pattern.compile(patternString,
Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(this.toString());
        return matcher.find();
    }
}

*And that's the stack trace:*
2016-06-08 17:20:10.091  INFO 13564 --- [om Source (1/1)]
o.apache.flink.runtime.taskmanager.Task  : Source: Custom Source (1/1)
switched to RUNNING
2016-06-08 17:20:10.096  INFO 13564 --- [lt-dispatcher-2]
o.a.f.r.executiongraph.ExecutionGraph    : Sink: Unnamed (1/1)
(50d42a893093705f278bd0aa099a53d3) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-5]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (4/4) (3c98bbdab04256d73f1f405669d007a8) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-3]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (3/4) (49c23924bae61cead7158bc817c22d0b) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-4]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (2/4) (7cfc6b655d7bb70beb901012094db0e5) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-7]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (1/4) (6afafe555579dc31e1c974c7238d486c) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Sink:
Unnamed(1/1) switched to RUNNING
06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING
2016-06-08 17:20:10.100  INFO 13564 --- [lt-dispatcher-9]
o.a.f.r.executiongraph.ExecutionGraph    : Source: Custom Source (1/1)
(92f08ef0a7d715478bf9fe60e8bc4dea) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.107  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(4/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to
RUNNING
2016-06-08 17:20:10.108  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(3/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(2/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(1/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Source:
Custom Source(1/1) switched to RUNNING
06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING
2016-06-08 17:20:10.124  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.125  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.129  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.132  WARN 13564 --- [: Unnamed (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.136  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.136  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.137  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.137  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.138  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.138  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.139  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.139  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.140  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.140  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.148  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.148  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.149  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.149  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.150  WARN 13564 --- [om Source (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.150  INFO 13564 --- [om Source (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.158  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.159  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.159  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.160  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.359  INFO 13564 --- [om Source (1/1)]
com.twitter.hbc.httpclient.BasicClient   : New connection executed:
twitterSourceClient, endpoint: /1.1/user.json?delimited=length
2016-06-08 17:20:10.622  INFO 13564 --- [ent-io-thread-0]
com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Establishing
a connection
2016-06-08 17:20:11.687  INFO 13564 --- [ent-io-thread-0]
com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Processing
connection data

*Then Here are some results printed:*

2> Fast Company : Homemade brings an Etsy mindset to food
https://t.co/9uaOTPka58 https://t.co/VB7IFIxrdM
3> tagesthemen : Noch immer #Flüchtlinge auf der #Balkanroute. An der
Grenze zu #Ungarn wartet Stacheldraht - der hat aber Löcher.
https://t.co/YHW25gnTzA
4> BuzzFeed News : Maria Sharapova says that she will fight back against
the ITF's decision to suspend her https://t.co/DuNhDUv64f
https://t.co/3BZcfR9Vid

*Then  the exceptions are thrown:*

2016-06-08 17:20:42.326 ERROR 13564 --- [io-8080-exec-10]
o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet
[dispatcherServlet] in context with path [] threw exception [Request
processing failed; nested exception is
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
97] with root cause

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
97 at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[kryo-2.24.0.jar:na] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
~[flink-core-1.0.3.jar:1.0.3] at
org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559)
~[classes/:na] at
com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>)
~[classes/:na] at
org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651)
~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>)
~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:1.8.0_73] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_73] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497)
~[na:1.8.0_73] at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(HttpServlet.service(HttpServlet.java:729)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522)
AbstractConnectionHandler.process(AbstractProtocol.java:672)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73] at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

java.lang.IndexOutOfBoundsException: Index: 99, Size: 0 at
java.util.ArrayList.rangeCheck(ArrayList.java:653) ~[na:1.8.0_73] at
java.util.ArrayList.get(ArrayList.java:429) ~[na:1.8.0_73] at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
~[kryo-2.24.0.jar:na] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
~[flink-core-1.0.3.jar:1.0.3] at
org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559)
~[classes/:na] at
com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>)
~[classes/:na] at
org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651)
~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>)
~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:1.8.0_73] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_73] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497)
~[na:1.8.0_73] at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
~[spring-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73] at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

And please note that the line where these exceptions point is the line that
I'm checking the condition if(iterator.hasNext())

Thanks,
Ahmed




On 8 June 2016 at 16:07, Till Rohrmann <tr...@apache.org> wrote:

> Hi Ahmed,
>
> the problem usually occurs, if you use differently initialized Kryo
> instances where one instance has a different set of classes registered. But
> your data could also be corrupted because you see an
> IndexOutOfBoundsException where you try to access an element of an array
> with size 0 at index 32.
>
> In order to debug the problem it would be helpful to see the full stack
> traces of the errors and the complete error message. Additionally, it would
> be helpful to see your program so that we could try to reproduce the
> problem.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ah...@gmail.com>
> wrote:
>
>> Hello,
>> I have a TwitterSource and I'm applying some transformations as filter
>> and map on the resulting stream from twitter. I'm collecting the output in
>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>> parallel thread i periodically check if this iterator.hasNext() and print
>> the next item. I'm using Flink 1.0.3.
>> That program works at the beginning and actually prints some items,
>> however when i leave it running for some more time (Like for example after
>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>> These 2 exceptions result from the line where i'm checking if the
>> iterator hasNext().
>>
>> I wanted to know why do these exceptions happen in general and also if
>> anyone knows a specific solution for my program, that would be great too.
>> Thanks,
>> Ahmed
>>
>
>

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ahmed,

the problem usually occurs, if you use differently initialized Kryo
instances where one instance has a different set of classes registered. But
your data could also be corrupted because you see an
IndexOutOfBoundsException where you try to access an element of an array
with size 0 at index 32.

In order to debug the problem it would be helpful to see the full stack
traces of the errors and the complete error message. Additionally, it would
be helpful to see your program so that we could try to reproduce the
problem.

Cheers,
Till

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ah...@gmail.com> wrote:

> Hello,
> I have a TwitterSource and I'm applying some transformations as filter and
> map on the resulting stream from twitter. I'm collecting the output in an
> iterator: iterator = DataStreamUtils.collect(datastream). Then in a
> parallel thread i periodically check if this iterator.hasNext() and print
> the next item. I'm using Flink 1.0.3.
> That program works at the beginning and actually prints some items,
> however when i leave it running for some more time (Like for example after
> 40 seconds or 1 minute) then i get 2 exceptions which are:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID
> and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
> These 2 exceptions result from the line where i'm checking if the iterator
> hasNext().
>
> I wanted to know why do these exceptions happen in general and also if
> anyone knows a specific solution for my program, that would be great too.
> Thanks,
> Ahmed
>

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ahmed,

I tried setting up your use case and for me it all seems to work. However,
I didn't use the Spring framework and executed the program in a local Flink
cluster.

Maybe you can compile a self-containing example (including example data) to
reproduce your problem and send it to us.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader <ah...@gmail.com> wrote:

> Hello Flavio,
> Thank you so much for replying, however I didn't download Flink locally, I
> only added dependencies in a maven project. So i don't think I'll be able
> to modify the KryoSerializer class. But yeah me too i think it's the
> problem.
> Thanks,
> Ahmed
>
> On 8 June 2016 at 16:07, Flavio Pompermaier <po...@okkam.it> wrote:
>
>> Hi Ahmed,
>> I also have the same error that is probably caused by the KryoSerializer.
>> Right now I'm testing a patch to this problem so maybe you could also
>> test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you
>> can use my KryoSerializer but I think so. Actually I just recreate Input
>> and Output every time in the serialized/deserialize and then I close them.
>>
>> This is my attempt to fix the problem (actually the KryoSerializer class
>> in the flink-core module):
>>
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.flink.api.java.typeutils.runtime.kryo;
>>
>> import com.esotericsoftware.kryo.Kryo;
>> import com.esotericsoftware.kryo.KryoException;
>> import com.esotericsoftware.kryo.Serializer;
>> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
>> import com.esotericsoftware.kryo.io.Input;
>> import com.esotericsoftware.kryo.io.Output;
>> import com.esotericsoftware.kryo.serializers.JavaSerializer;
>> import com.google.common.base.Preconditions;
>>
>> import org.apache.avro.generic.GenericData;
>> import org.apache.flink.api.common.ExecutionConfig;
>> import org.apache.flink.api.common.typeutils.TypeSerializer;
>> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
>> import
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
>> import org.apache.flink.core.memory.DataInputView;
>> import org.apache.flink.core.memory.DataOutputView;
>> import org.objenesis.strategy.StdInstantiatorStrategy;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.ByteArrayInputStream;
>> import java.io.ByteArrayOutputStream;
>> import java.io.EOFException;
>> import java.io.IOException;
>> import java.lang.reflect.InvocationTargetException;
>> import java.lang.reflect.Method;
>> import java.lang.reflect.Modifier;
>> import java.util.LinkedHashMap;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.Objects;
>>
>> /**
>>  * A type serializer that serializes its type using the Kryo serialization
>>  * framework (https://github.com/EsotericSoftware/kryo).
>>  *
>>  * This serializer is intended as a fallback serializer for the cases
>> that are
>>  * not covered by the basic types, tuples, and POJOs.
>>  *
>>  * @param <T> The type to be serialized.
>>  */
>> public class KryoSerializer<T> extends TypeSerializer<T> {
>>
>> private static final long serialVersionUID = 3L;
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(KryoSerializer.class);
>>
>> //
>> ------------------------------------------------------------------------
>>
>> private final LinkedHashMap<Class<?>,
>> ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
>> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
>> registeredTypesWithSerializerClasses;
>> private final LinkedHashMap<Class<?>,
>> ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
>> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
>> defaultSerializerClasses;
>> private final LinkedHashSet<Class<?>> registeredTypes;
>>
>> private final Class<T> type;
>> //
>> ------------------------------------------------------------------------
>> // The fields below are lazily initialized after duplication or
>> deserialization.
>>
>> private transient Kryo kryo;
>> private transient T copyInstance;
>> //
>> ------------------------------------------------------------------------
>>
>> public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
>> this.type = Preconditions.checkNotNull(type);
>>
>> this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
>> this.defaultSerializerClasses =
>> executionConfig.getDefaultKryoSerializerClasses();
>> this.registeredTypesWithSerializers =
>> executionConfig.getRegisteredTypesWithKryoSerializers();
>> this.registeredTypesWithSerializerClasses =
>> executionConfig.getRegisteredTypesWithKryoSerializerClasses();
>> this.registeredTypes = executionConfig.getRegisteredKryoTypes();
>> }
>>
>> /**
>> * Copy-constructor that does not copy transient fields. They will be
>> initialized once required.
>> */
>> protected KryoSerializer(KryoSerializer<T> toCopy) {
>> registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
>> registeredTypesWithSerializerClasses =
>> toCopy.registeredTypesWithSerializerClasses;
>> defaultSerializers = toCopy.defaultSerializers;
>> defaultSerializerClasses = toCopy.defaultSerializerClasses;
>> registeredTypes = toCopy.registeredTypes;
>>
>> type = toCopy.type;
>> if(type == null){
>> throw new NullPointerException("Type class cannot be null.");
>> }
>> }
>>
>> //
>> ------------------------------------------------------------------------
>>
>> @Override
>> public boolean isImmutableType() {
>> return false;
>> }
>>
>> @Override
>> public KryoSerializer<T> duplicate() {
>> return new KryoSerializer<T>(this);
>> }
>>
>> @Override
>> public T createInstance() {
>> if(Modifier.isAbstract(type.getModifiers()) ||
>> Modifier.isInterface(type.getModifiers()) ) {
>> return null;
>> } else {
>> checkKryoInitialized();
>> try {
>> return kryo.newInstance(type);
>> } catch(Throwable e) {
>> return null;
>> }
>> }
>> }
>>
>> @SuppressWarnings("unchecked")
>> @Override
>> public T copy(T from) {
>> if (from == null) {
>> return null;
>> }
>> checkKryoInitialized();
>> try {
>> return kryo.copy(from);
>> }
>> catch(KryoException ke) {
>> // kryo was unable to copy it, so we do it through serialization:
>> ByteArrayOutputStream baout = new ByteArrayOutputStream();
>> Output output = new Output(baout);
>>
>> kryo.writeObject(output, from);
>>
>> output.close();
>>
>> ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
>> Input input = new Input(bain);
>>
>> return (T)kryo.readObject(input, from.getClass());
>> }
>> }
>> @Override
>> public T copy(T from, T reuse) {
>> return copy(from);
>> }
>>
>> @Override
>> public int getLength() {
>> return -1;
>> }
>>
>> @Override
>> public void serialize(T record, DataOutputView target) throws IOException
>> {
>> checkKryoInitialized();
>>   DataOutputViewStream outputStream = new DataOutputViewStream(target);
>>   Output output = new Output(outputStream);
>>
>> try {
>>    // Sanity check: Make sure that the output is cleared/has been flushed
>> by the last call
>>        // otherwise data might be written multiple times in case of a
>> previous EOFException
>>        if (output.position() != 0) {
>>            throw new IllegalStateException("The Kryo Output still
>> contains data from a previous " +
>>                "serialize call. It has to be flushed or cleared at the
>> end of the serialize call.");
>>        }
>>
>> kryo.writeClassAndObject(output, record);
>> output.flush();
>> }
>> catch (KryoException ke) {
>> Throwable cause = ke.getCause();
>> if (cause instanceof EOFException) {
>> throw (EOFException) cause;
>> }
>> else {
>> throw ke;
>> }
>> } finally {
>>      try {
>>               output.close();
>>             } catch (KryoException ke) {
>>               Throwable cause = ke.getCause();
>>
>>               if (cause instanceof EOFException) {
>>                   throw (EOFException) cause;
>>               } else {
>>                   throw ke;
>>               }
>>           }
>> }
>> }
>>
>> @SuppressWarnings("unchecked")
>> @Override
>> public T deserialize(DataInputView source) throws IOException {
>> checkKryoInitialized();
>> DataInputViewStream inputStream = new DataInputViewStream(source);
>> Input input = new NoFetchingInput(inputStream);
>>
>> try {
>> return (T) kryo.readClassAndObject(input);
>> } catch (KryoException ke) {
>> Throwable cause = ke.getCause();
>>
>> if (cause instanceof EOFException) {
>> throw (EOFException) cause;
>> } else {
>> throw ke;
>> }
>> } finally {
>>  try {
>>    input.close();
>>  } catch (KryoException ke) {
>>             Throwable cause = ke.getCause();
>>
>>             if (cause instanceof EOFException) {
>>                 throw (EOFException) cause;
>>             } else {
>>                 throw ke;
>>             }
>>           }
>> }
>> }
>> @Override
>> public T deserialize(T reuse, DataInputView source) throws IOException {
>> return deserialize(source);
>> }
>>
>> @Override
>> public void copy(DataInputView source, DataOutputView target) throws
>> IOException {
>> checkKryoInitialized();
>> if(this.copyInstance == null){
>> this.copyInstance = createInstance();
>> }
>>
>> T tmp = deserialize(copyInstance, source);
>> serialize(tmp, target);
>> }
>> //
>> --------------------------------------------------------------------------------------------
>> @Override
>> public int hashCode() {
>> return Objects.hash(
>> type,
>> registeredTypes,
>> registeredTypesWithSerializerClasses,
>> defaultSerializerClasses);
>> }
>> @Override
>> public boolean equals(Object obj) {
>> if (obj instanceof KryoSerializer) {
>> KryoSerializer<?> other = (KryoSerializer<?>) obj;
>>
>> // we cannot include the Serializers here because they don't implement
>> the equals method
>> return other.canEqual(this) &&
>> type == other.type &&
>> registeredTypes.equals(other.registeredTypes) &&
>> registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
>> &&
>> defaultSerializerClasses.equals(other.defaultSerializerClasses);
>> } else {
>> return false;
>> }
>> }
>>
>> @Override
>> public boolean canEqual(Object obj) {
>> return obj instanceof KryoSerializer;
>> }
>>
>> //
>> --------------------------------------------------------------------------------------------
>>
>> /**
>> * Returns the Chill Kryo Serializer which is implictly added to the
>> classpath via flink-runtime.
>> * Falls back to the default Kryo serializer if it can't be found.
>> * @return The Kryo serializer instance.
>> */
>> private Kryo getKryoInstance() {
>>
>> try {
>> // check if ScalaKryoInstantiator is in class path (coming from Twitter's
>> Chill library).
>> // This will be true if Flink's Scala API is used.
>> Class<?> chillInstantiatorClazz =
>> Class.forName("com.twitter.chill.ScalaKryoInstantiator");
>> Object chillInstantiator = chillInstantiatorClazz.newInstance();
>>
>> // obtain a Kryo instance through Twitter Chill
>> Method m = chillInstantiatorClazz.getMethod("newKryo");
>>
>> return (Kryo) m.invoke(chillInstantiator);
>> } catch (ClassNotFoundException | InstantiationException |
>> NoSuchMethodException |
>> IllegalAccessException | InvocationTargetException e) {
>>
>> LOG.warn("Falling back to default Kryo serializer because Chill
>> serializer couldn't be found.", e);
>>
>> Kryo.DefaultInstantiatorStrategy initStrategy = new
>> Kryo.DefaultInstantiatorStrategy();
>> initStrategy.setFallbackInstantiatorStrategy(new
>> StdInstantiatorStrategy());
>>
>> Kryo kryo = new Kryo();
>> kryo.setInstantiatorStrategy(initStrategy);
>>
>> return kryo;
>> }
>> }
>>
>> private void checkKryoInitialized() {
>> if (this.kryo == null) {
>> this.kryo = getKryoInstance();
>>
>> // Enable reference tracking.
>> kryo.setReferences(true);
>> // Throwable and all subclasses should be serialized via java
>> serialization
>> kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
>>
>> // Add default serializers first, so that they type registrations without
>> a serializer
>> // are registered with a default serializer
>> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
>> entry: defaultSerializers.entrySet()) {
>> kryo.addDefaultSerializer(entry.getKey(),
>> entry.getValue().getSerializer());
>> }
>>
>> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
>> defaultSerializerClasses.entrySet()) {
>> kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
>> }
>>
>> // register the type of our class
>> kryo.register(type);
>>
>> // register given types. we do this first so that any registration of a
>> // more specific serializer overrides this
>> for (Class<?> type : registeredTypes) {
>> kryo.register(type);
>> }
>>
>> // register given serializer classes
>> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
>> registeredTypesWithSerializerClasses.entrySet()) {
>> Class<?> typeClass = e.getKey();
>> Class<? extends Serializer<?>> serializerClass = e.getValue();
>>
>> Serializer<?> serializer =
>> ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
>> typeClass);
>> kryo.register(typeClass, serializer);
>> }
>>
>> // register given serializers
>> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e :
>> registeredTypesWithSerializers.entrySet()) {
>> kryo.register(e.getKey(), e.getValue().getSerializer());
>> }
>> // this is needed for Avro but can not be added on demand.
>> kryo.register(GenericData.Array.class, new
>> SpecificInstanceCollectionSerializerForArrayList());
>>
>> kryo.setRegistrationRequired(false);
>> kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
>> }
>> }
>>
>> //
>> --------------------------------------------------------------------------------------------
>> // For testing
>> //
>> --------------------------------------------------------------------------------------------
>> public Kryo getKryo() {
>> checkKryoInitialized();
>> return this.kryo;
>> }
>> }
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ah...@gmail.com>
>> wrote:
>>
>>> Hello,
>>> I have a TwitterSource and I'm applying some transformations as filter
>>> and map on the resulting stream from twitter. I'm collecting the output in
>>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>>> parallel thread i periodically check if this iterator.hasNext() and print
>>> the next item. I'm using Flink 1.0.3.
>>> That program works at the beginning and actually prints some items,
>>> however when i leave it running for some more time (Like for example after
>>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>>> These 2 exceptions result from the line where i'm checking if the
>>> iterator hasNext().
>>>
>>> I wanted to know why do these exceptions happen in general and also if
>>> anyone knows a specific solution for my program, that would be great too.
>>> Thanks,
>>> Ahmed
>>>
>>
>>
>>
>> --
>>
>> Flavio Pompermaier
>>
>> *Development Department*_______________________________________________
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Phone:* +(39) 0461 283 702
>> *Fax:* + (39) 0461 186 6433
>> *Email:* pompermaier@okkam.it
>> *Headquarters:* Trento (Italy), via G.B. Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>>
>

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Posted by Ahmed Nader <ah...@gmail.com>.
Hello Flavio,
Thank you so much for replying, however I didn't download Flink locally, I
only added dependencies in a maven project. So i don't think I'll be able
to modify the KryoSerializer class. But yeah me too i think it's the
problem.
Thanks,
Ahmed

On 8 June 2016 at 16:07, Flavio Pompermaier <po...@okkam.it> wrote:

> Hi Ahmed,
> I also have the same error that is probably caused by the KryoSerializer.
> Right now I'm testing a patch to this problem so maybe you could also test
> it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use
> my KryoSerializer but I think so. Actually I just recreate Input and Output
> every time in the serialized/deserialize and then I close them.
>
> This is my attempt to fix the problem (actually the KryoSerializer class
> in the flink-core module):
>
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.flink.api.java.typeutils.runtime.kryo;
>
> import com.esotericsoftware.kryo.Kryo;
> import com.esotericsoftware.kryo.KryoException;
> import com.esotericsoftware.kryo.Serializer;
> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
> import com.esotericsoftware.kryo.io.Input;
> import com.esotericsoftware.kryo.io.Output;
> import com.esotericsoftware.kryo.serializers.JavaSerializer;
> import com.google.common.base.Preconditions;
>
> import org.apache.avro.generic.GenericData;
> import org.apache.flink.api.common.ExecutionConfig;
> import org.apache.flink.api.common.typeutils.TypeSerializer;
> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
> import
> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
> import org.apache.flink.core.memory.DataInputView;
> import org.apache.flink.core.memory.DataOutputView;
> import org.objenesis.strategy.StdInstantiatorStrategy;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.EOFException;
> import java.io.IOException;
> import java.lang.reflect.InvocationTargetException;
> import java.lang.reflect.Method;
> import java.lang.reflect.Modifier;
> import java.util.LinkedHashMap;
> import java.util.LinkedHashSet;
> import java.util.Map;
> import java.util.Objects;
>
> /**
>  * A type serializer that serializes its type using the Kryo serialization
>  * framework (https://github.com/EsotericSoftware/kryo).
>  *
>  * This serializer is intended as a fallback serializer for the cases that
> are
>  * not covered by the basic types, tuples, and POJOs.
>  *
>  * @param <T> The type to be serialized.
>  */
> public class KryoSerializer<T> extends TypeSerializer<T> {
>
> private static final long serialVersionUID = 3L;
>
> private static final Logger LOG =
> LoggerFactory.getLogger(KryoSerializer.class);
>
> // ------------------------------------------------------------------------
>
> private final LinkedHashMap<Class<?>,
> ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
> registeredTypesWithSerializerClasses;
> private final LinkedHashMap<Class<?>,
> ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
> defaultSerializerClasses;
> private final LinkedHashSet<Class<?>> registeredTypes;
>
> private final Class<T> type;
> // ------------------------------------------------------------------------
> // The fields below are lazily initialized after duplication or
> deserialization.
>
> private transient Kryo kryo;
> private transient T copyInstance;
> // ------------------------------------------------------------------------
>
> public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
> this.type = Preconditions.checkNotNull(type);
>
> this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
> this.defaultSerializerClasses =
> executionConfig.getDefaultKryoSerializerClasses();
> this.registeredTypesWithSerializers =
> executionConfig.getRegisteredTypesWithKryoSerializers();
> this.registeredTypesWithSerializerClasses =
> executionConfig.getRegisteredTypesWithKryoSerializerClasses();
> this.registeredTypes = executionConfig.getRegisteredKryoTypes();
> }
>
> /**
> * Copy-constructor that does not copy transient fields. They will be
> initialized once required.
> */
> protected KryoSerializer(KryoSerializer<T> toCopy) {
> registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
> registeredTypesWithSerializerClasses =
> toCopy.registeredTypesWithSerializerClasses;
> defaultSerializers = toCopy.defaultSerializers;
> defaultSerializerClasses = toCopy.defaultSerializerClasses;
> registeredTypes = toCopy.registeredTypes;
>
> type = toCopy.type;
> if(type == null){
> throw new NullPointerException("Type class cannot be null.");
> }
> }
>
> // ------------------------------------------------------------------------
>
> @Override
> public boolean isImmutableType() {
> return false;
> }
>
> @Override
> public KryoSerializer<T> duplicate() {
> return new KryoSerializer<T>(this);
> }
>
> @Override
> public T createInstance() {
> if(Modifier.isAbstract(type.getModifiers()) ||
> Modifier.isInterface(type.getModifiers()) ) {
> return null;
> } else {
> checkKryoInitialized();
> try {
> return kryo.newInstance(type);
> } catch(Throwable e) {
> return null;
> }
> }
> }
>
> @SuppressWarnings("unchecked")
> @Override
> public T copy(T from) {
> if (from == null) {
> return null;
> }
> checkKryoInitialized();
> try {
> return kryo.copy(from);
> }
> catch(KryoException ke) {
> // kryo was unable to copy it, so we do it through serialization:
> ByteArrayOutputStream baout = new ByteArrayOutputStream();
> Output output = new Output(baout);
>
> kryo.writeObject(output, from);
>
> output.close();
>
> ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
> Input input = new Input(bain);
>
> return (T)kryo.readObject(input, from.getClass());
> }
> }
> @Override
> public T copy(T from, T reuse) {
> return copy(from);
> }
>
> @Override
> public int getLength() {
> return -1;
> }
>
> @Override
> public void serialize(T record, DataOutputView target) throws IOException {
> checkKryoInitialized();
>   DataOutputViewStream outputStream = new DataOutputViewStream(target);
>   Output output = new Output(outputStream);
>
> try {
>    // Sanity check: Make sure that the output is cleared/has been flushed
> by the last call
>        // otherwise data might be written multiple times in case of a
> previous EOFException
>        if (output.position() != 0) {
>            throw new IllegalStateException("The Kryo Output still contains
> data from a previous " +
>                "serialize call. It has to be flushed or cleared at the end
> of the serialize call.");
>        }
>
> kryo.writeClassAndObject(output, record);
> output.flush();
> }
> catch (KryoException ke) {
> Throwable cause = ke.getCause();
> if (cause instanceof EOFException) {
> throw (EOFException) cause;
> }
> else {
> throw ke;
> }
> } finally {
>      try {
>               output.close();
>             } catch (KryoException ke) {
>               Throwable cause = ke.getCause();
>
>               if (cause instanceof EOFException) {
>                   throw (EOFException) cause;
>               } else {
>                   throw ke;
>               }
>           }
> }
> }
>
> @SuppressWarnings("unchecked")
> @Override
> public T deserialize(DataInputView source) throws IOException {
> checkKryoInitialized();
> DataInputViewStream inputStream = new DataInputViewStream(source);
> Input input = new NoFetchingInput(inputStream);
>
> try {
> return (T) kryo.readClassAndObject(input);
> } catch (KryoException ke) {
> Throwable cause = ke.getCause();
>
> if (cause instanceof EOFException) {
> throw (EOFException) cause;
> } else {
> throw ke;
> }
> } finally {
>  try {
>    input.close();
>  } catch (KryoException ke) {
>             Throwable cause = ke.getCause();
>
>             if (cause instanceof EOFException) {
>                 throw (EOFException) cause;
>             } else {
>                 throw ke;
>             }
>           }
> }
> }
> @Override
> public T deserialize(T reuse, DataInputView source) throws IOException {
> return deserialize(source);
> }
>
> @Override
> public void copy(DataInputView source, DataOutputView target) throws
> IOException {
> checkKryoInitialized();
> if(this.copyInstance == null){
> this.copyInstance = createInstance();
> }
>
> T tmp = deserialize(copyInstance, source);
> serialize(tmp, target);
> }
> //
> --------------------------------------------------------------------------------------------
> @Override
> public int hashCode() {
> return Objects.hash(
> type,
> registeredTypes,
> registeredTypesWithSerializerClasses,
> defaultSerializerClasses);
> }
> @Override
> public boolean equals(Object obj) {
> if (obj instanceof KryoSerializer) {
> KryoSerializer<?> other = (KryoSerializer<?>) obj;
>
> // we cannot include the Serializers here because they don't implement the
> equals method
> return other.canEqual(this) &&
> type == other.type &&
> registeredTypes.equals(other.registeredTypes) &&
> registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
> &&
> defaultSerializerClasses.equals(other.defaultSerializerClasses);
> } else {
> return false;
> }
> }
>
> @Override
> public boolean canEqual(Object obj) {
> return obj instanceof KryoSerializer;
> }
>
> //
> --------------------------------------------------------------------------------------------
>
> /**
> * Returns the Chill Kryo Serializer which is implictly added to the
> classpath via flink-runtime.
> * Falls back to the default Kryo serializer if it can't be found.
> * @return The Kryo serializer instance.
> */
> private Kryo getKryoInstance() {
>
> try {
> // check if ScalaKryoInstantiator is in class path (coming from Twitter's
> Chill library).
> // This will be true if Flink's Scala API is used.
> Class<?> chillInstantiatorClazz =
> Class.forName("com.twitter.chill.ScalaKryoInstantiator");
> Object chillInstantiator = chillInstantiatorClazz.newInstance();
>
> // obtain a Kryo instance through Twitter Chill
> Method m = chillInstantiatorClazz.getMethod("newKryo");
>
> return (Kryo) m.invoke(chillInstantiator);
> } catch (ClassNotFoundException | InstantiationException |
> NoSuchMethodException |
> IllegalAccessException | InvocationTargetException e) {
>
> LOG.warn("Falling back to default Kryo serializer because Chill serializer
> couldn't be found.", e);
>
> Kryo.DefaultInstantiatorStrategy initStrategy = new
> Kryo.DefaultInstantiatorStrategy();
> initStrategy.setFallbackInstantiatorStrategy(new
> StdInstantiatorStrategy());
>
> Kryo kryo = new Kryo();
> kryo.setInstantiatorStrategy(initStrategy);
>
> return kryo;
> }
> }
>
> private void checkKryoInitialized() {
> if (this.kryo == null) {
> this.kryo = getKryoInstance();
>
> // Enable reference tracking.
> kryo.setReferences(true);
> // Throwable and all subclasses should be serialized via java serialization
> kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
>
> // Add default serializers first, so that they type registrations without
> a serializer
> // are registered with a default serializer
> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry:
> defaultSerializers.entrySet()) {
> kryo.addDefaultSerializer(entry.getKey(),
> entry.getValue().getSerializer());
> }
>
> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
> defaultSerializerClasses.entrySet()) {
> kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
> }
>
> // register the type of our class
> kryo.register(type);
>
> // register given types. we do this first so that any registration of a
> // more specific serializer overrides this
> for (Class<?> type : registeredTypes) {
> kryo.register(type);
> }
>
> // register given serializer classes
> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
> registeredTypesWithSerializerClasses.entrySet()) {
> Class<?> typeClass = e.getKey();
> Class<? extends Serializer<?>> serializerClass = e.getValue();
>
> Serializer<?> serializer =
> ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
> typeClass);
> kryo.register(typeClass, serializer);
> }
>
> // register given serializers
> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e :
> registeredTypesWithSerializers.entrySet()) {
> kryo.register(e.getKey(), e.getValue().getSerializer());
> }
> // this is needed for Avro but can not be added on demand.
> kryo.register(GenericData.Array.class, new
> SpecificInstanceCollectionSerializerForArrayList());
>
> kryo.setRegistrationRequired(false);
> kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
> }
> }
>
> //
> --------------------------------------------------------------------------------------------
> // For testing
> //
> --------------------------------------------------------------------------------------------
> public Kryo getKryo() {
> checkKryoInitialized();
> return this.kryo;
> }
> }
>
> Best,
> Flavio
>
> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ah...@gmail.com>
> wrote:
>
>> Hello,
>> I have a TwitterSource and I'm applying some transformations as filter
>> and map on the resulting stream from twitter. I'm collecting the output in
>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>> parallel thread i periodically check if this iterator.hasNext() and print
>> the next item. I'm using Flink 1.0.3.
>> That program works at the beginning and actually prints some items,
>> however when i leave it running for some more time (Like for example after
>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>> These 2 exceptions result from the line where i'm checking if the
>> iterator hasNext().
>>
>> I wanted to know why do these exceptions happen in general and also if
>> anyone knows a specific solution for my program, that would be great too.
>> Thanks,
>> Ahmed
>>
>
>
>
> --
>
> Flavio Pompermaier
>
> *Development Department*_______________________________________________
> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>
> *Phone:* +(39) 0461 283 702
> *Fax:* + (39) 0461 186 6433
> *Email:* pompermaier@okkam.it
> *Headquarters:* Trento (Italy), via G.B. Trener 8
> *Registered office:* Trento (Italy), via Segantini 23
>
> Confidentially notice. This e-mail transmission may contain legally
> privileged and/or confidential information. Please do not read it if you
> are not the intended recipient(S). Any use, distribution, reproduction or
> disclosure by any other person is strictly prohibited. If you have received
> this e-mail in error, please notify the sender and destroy the original
> transmission and its attachments without reading or saving it in any manner.
>
>

Re: com.esotericsoftware.kryo.KryoException and java.lang.IndexOutOfBoundsException

Posted by Flavio Pompermaier <po...@okkam.it>.
Hi Ahmed,
I also have the same error that is probably caused by the KryoSerializer.
Right now I'm testing a patch to this problem so maybe you could also test
it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use
my KryoSerializer but I think so. Actually I just recreate Input and Output
every time in the serialized/deserialize and then I close them.

This is my attempt to fix the problem (actually the KryoSerializer class in
the flink-core module):


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;

import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * framework (https://github.com/EsotericSoftware/kryo).
 *
 * This serializer is intended as a fallback serializer for the cases that
are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 3L;

private static final Logger LOG =
LoggerFactory.getLogger(KryoSerializer.class);

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;

private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or
deserialization.

private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------

public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = Preconditions.checkNotNull(type);

this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses =
executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers =
executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses =
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}

/**
* Copy-constructor that does not copy transient fields. They will be
initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses =
toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;

type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}

// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}

@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) ||
Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);

return (T)kryo.readObject(input, from.getClass());
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
  DataOutputViewStream outputStream = new DataOutputViewStream(target);
  Output output = new Output(outputStream);

try {
   // Sanity check: Make sure that the output is cleared/has been flushed
by the last call
       // otherwise data might be written multiple times in case of a
previous EOFException
       if (output.position() != 0) {
           throw new IllegalStateException("The Kryo Output still contains
data from a previous " +
               "serialize call. It has to be flushed or cleared at the end
of the serialize call.");
       }

kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
     try {
              output.close();
            } catch (KryoException ke) {
              Throwable cause = ke.getCause();

              if (cause instanceof EOFException) {
                  throw (EOFException) cause;
              } else {
                  throw ke;
              }
          }
}
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);

try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
 try {
   input.close();
 } catch (KryoException ke) {
            Throwable cause = ke.getCause();

            if (cause instanceof EOFException) {
                throw (EOFException) cause;
            } else {
                throw ke;
            }
          }
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws
IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}

T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
//
--------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;

// we cannot include the Serializers here because they don't implement the
equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
&&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}

//
--------------------------------------------------------------------------------------------

/**
* Returns the Chill Kryo Serializer which is implictly added to the
classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {

try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's
Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz =
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();

// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");

return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException |
NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {

LOG.warn("Falling back to default Kryo serializer because Chill serializer
couldn't be found.", e);

Kryo.DefaultInstantiatorStrategy initStrategy = new
Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

return kryo;
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();

// Enable reference tracking.
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a
serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry:
defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}

for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

// register the type of our class
kryo.register(type);

// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}

// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();

Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
typeClass);
kryo.register(typeClass, serializer);
}

// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e :
registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new
SpecificInstanceCollectionSerializerForArrayList());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}

//
--------------------------------------------------------------------------------------------
// For testing
//
--------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}

Best,
Flavio

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ah...@gmail.com> wrote:

> Hello,
> I have a TwitterSource and I'm applying some transformations as filter and
> map on the resulting stream from twitter. I'm collecting the output in an
> iterator: iterator = DataStreamUtils.collect(datastream). Then in a
> parallel thread i periodically check if this iterator.hasNext() and print
> the next item. I'm using Flink 1.0.3.
> That program works at the beginning and actually prints some items,
> however when i leave it running for some more time (Like for example after
> 40 seconds or 1 minute) then i get 2 exceptions which are:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID
> and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
> These 2 exceptions result from the line where i'm checking if the iterator
> hasNext().
>
> I wanted to know why do these exceptions happen in general and also if
> anyone knows a specific solution for my program, that would be great too.
> Thanks,
> Ahmed
>



-- 

Flavio Pompermaier

*Development Department*_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pompermaier@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.