You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2015/02/06 18:48:34 UTC

[jira] [Resolved] (FLINK-1438) ClassCastException for Custom InputSplit in local mode and invalid type code in distributed mode

     [ https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stephan Ewen resolved FLINK-1438.
---------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9
         Assignee: Stephan Ewen

Fixed via a07d59d72fc059a600a3eb1f479b02964ca256f5

> ClassCastException for Custom InputSplit in local mode and invalid type code in distributed mode
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-1438
>                 URL: https://issues.apache.org/jira/browse/FLINK-1438
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.8, 0.9
>            Reporter: Fabian Hueske
>            Assignee: Stephan Ewen
>            Priority: Minor
>             Fix For: 0.9
>
>
> Jobs with custom InputSplits fail with a ClassCastException such as {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit cannot be cast to org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} if executed on a local setup. 
> This issue is probably related to different ClassLoaders used by the JobManager when InputSplits are generated and when they are handed to the InputFormat by the TaskManager. Moving the class of the custom InputSplit into the {{./lib}} folder and removing it from the job's makes the job work.
> To reproduce the bug, run the following job on a local setup. 
> {code}
> public class CustomSplitTestJob {
> 	public static void main(String[] args) throws Exception {
> 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 		DataSet<String> x = env.createInput(new TestFileInputFormat());
> 		x.print();
> 		env.execute();
> 	}
> 	public static class TestFileInputFormat implements InputFormat<String,TestFileInputSplit> {
> 		@Override
> 		public void configure(Configuration parameters) {
> 		}
> 		@Override
> 		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
> 			return null;
> 		}
> 		@Override
> 		public TestFileInputSplit[] createInputSplits(int minNumSplits) throws IOException {
> 			return new TestFileInputSplit[]{new TestFileInputSplit()};
> 		}
> 		@Override
> 		public InputSplitAssigner getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
> 			return new LocatableInputSplitAssigner(inputSplits);
> 		}
> 		@Override
> 		public void open(TestFileInputSplit split) throws IOException {
> 		}
> 		@Override
> 		public boolean reachedEnd() throws IOException {
> 			return false;
> 		}
> 		@Override
> 		public String nextRecord(String reuse) throws IOException {
> 			return null;
> 		}
> 		@Override
> 		public void close() throws IOException {
> 		}
> 	}
> 	public static class TestFileInputSplit extends FileInputSplit {
> 	}
> }
> {code}
> The same happens in distributed mode just that Akka terminates the transmission of the input split with a meaningless {{invalid type code: 00}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)