You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:12 UTC

[22/27] flink git commit: [storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
new file mode 100644
index 0000000..53b84ef
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.InetAddress;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * A specialized data sink to be used by DataStreamUtils.collect.
+ */
+class CollectSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final InetAddress hostIp;
+	private final int port;
+	private final TypeSerializer<IN> serializer;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+	private StreamWriterDataOutputView streamWriter;
+
+	/**
+	 * Creates a CollectSink that will send the data to the specified host.
+	 *
+	 * @param hostIp IP address of the Socket server.
+	 * @param port Port of the Socket server.
+	 * @param serializer A serializer for the data.
+	 */
+	public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
+		this.hostIp = hostIp;
+		this.port = port;
+		this.serializer = serializer;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void initializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostIp, port);
+			outputStream = client.getOutputStream();
+			streamWriter = new StreamWriterDataOutputView(outputStream);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		try {
+			serializer.serialize(value, streamWriter);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostIp.toString() + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			dataOutputStream.flush();
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostIp.toString() + ":" + port, e);
+		} finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (IOException e) {
+					LOG.error("Cannot close connection with socket server at "
+							+ hostIp.toString() + ":" + port, e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) {
+		initializeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+	private static class StreamWriterDataOutputView extends DataOutputStream implements DataOutputView {
+
+		public StreamWriterDataOutputView(OutputStream stream) {
+			super(stream);
+		}
+
+		public void skipBytesToWrite(int numBytes) throws IOException {
+			for (int i = 0; i < numBytes; i++) {
+				write(0);
+			}
+		}
+
+		public void write(DataInputView source, int numBytes) throws IOException {
+			byte[] data = new byte[numBytes];
+			source.readFully(data);
+			write(data);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
new file mode 100644
index 0000000..a98740e
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.util.Iterator;
+import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.io.DataInputStream;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+class DataStreamIterator<T> implements Iterator<T> {
+
+	ServerSocket socket;
+	InputStream tcpStream;
+	T next;
+	private final CountDownLatch connectionAccepted = new CountDownLatch(1);
+	private volatile StreamReaderDataInputView streamReader;
+	private final TypeSerializer<T> serializer;
+
+	DataStreamIterator(TypeSerializer serializer) {
+		this.serializer = serializer;
+		try {
+			socket = new ServerSocket(0, 1, null);
+		} catch (IOException e) {
+			throw new RuntimeException("DataStreamIterator: an I/O error occurred when opening the socket", e);
+		}
+		(new AcceptThread()).start();
+	}
+
+	private class AcceptThread extends Thread {
+		public void run() {
+			try {
+				tcpStream = socket.accept().getInputStream();
+				streamReader = new StreamReaderDataInputView(tcpStream);
+				connectionAccepted.countDown();
+			} catch (IOException e) {
+				throw new RuntimeException("DataStreamIterator.AcceptThread failed", e);
+			}
+		}
+	}
+
+	/**
+	 * Returns the port on which the iterator is getting the data. (Used internally.)
+	 * @return The port
+	 */
+	public int getPort() {
+		return socket.getLocalPort();
+	}
+
+	/**
+	 * Returns true if the DataStream has more elements.
+	 * (Note: blocks if there will be more elements, but they are not available yet.)
+	 * @return true if the DataStream has more elements
+	 */
+	@Override
+	public boolean hasNext() {
+		if (next == null) {
+			readNextFromStream();
+		}
+		return next != null;
+	}
+
+	/**
+	 * Returns the next element of the DataStream. (Blocks if it is not available yet.)
+	 * @return The element
+	 * @throws NoSuchElementException if the stream has already ended
+	 */
+	@Override
+	public T next() {
+		if (next == null) {
+			readNextFromStream();
+			if (next == null) {
+				throw new NoSuchElementException();
+			}
+		}
+		T current = next;
+		next = null;
+		return current;
+	}
+
+	private void readNextFromStream(){
+		try {
+			connectionAccepted.await();
+		} catch (InterruptedException e) {
+			throw new RuntimeException("The calling thread of DataStreamIterator.readNextFromStream was interrupted.");
+		}
+		try {
+			next = serializer.deserialize(streamReader);
+		} catch (EOFException e) {
+			next = null;
+		} catch (IOException e) {
+			throw new RuntimeException("DataStreamIterator could not read from deserializedStream", e);
+		}
+	}
+
+	private static class StreamReaderDataInputView extends DataInputStream implements DataInputView {
+
+		public StreamReaderDataInputView(InputStream stream) {
+			super(stream);
+		}
+
+		public void skipBytesToRead(int numBytes) throws IOException {
+			while (numBytes > 0) {
+				int skipped = skipBytes(numBytes);
+				numBytes -= skipped;
+			}
+		}
+	}
+
+	@Override
+	public void remove() {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
new file mode 100644
index 0000000..276409d
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.runtime.net.NetUtils;
+
+public class DataStreamUtils {
+
+	/**
+	 * Returns an iterator to iterate over the elements of the DataStream.
+	 * @return The iterator
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+		TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
+		DataStreamIterator<OUT> it = new DataStreamIterator<OUT>(serializer);
+
+		//Find out what IP of us should be given to CollectSink, that it will be able to connect to
+		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+		InetAddress clientAddress;
+		if(env instanceof RemoteStreamEnvironment) {
+			String host = ((RemoteStreamEnvironment)env).getHost();
+			int port = ((RemoteStreamEnvironment)env).getPort();
+			try {
+				clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+			} catch (IOException e) {
+				throw new RuntimeException("IOException while trying to connect to the master", e);
+			}
+		} else {
+			try {
+				clientAddress = InetAddress.getLocalHost();
+			} catch (UnknownHostException e) {
+				throw new RuntimeException("getLocalHost failed", e);
+			}
+		}
+
+		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, it.getPort(), serializer));
+		sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
+
+		(new CallExecute(stream)).start();
+
+		return it;
+	}
+
+	private static class CallExecute<OUT> extends Thread {
+
+		DataStream<OUT> stream;
+
+		public CallExecute(DataStream<OUT> stream) {
+			this.stream = stream;
+		}
+
+		@Override
+		public void run(){
+			try {
+				stream.getExecutionEnvironment().execute();
+			} catch (Exception e) {
+				throw new RuntimeException("Exception in execute()", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
new file mode 100644
index 0000000..f21e58c
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Test;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.junit.Assert;
+
+import java.util.Iterator;
+
+/**
+ * This test verifies the behavior of DataStreamUtils.collect.
+ */
+public class CollectITCase {
+
+	@Test
+	public void testCollect() {
+
+		Configuration config = new Configuration();
+		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+				"localhost", cluster.getJobManagerRPCPort());
+
+		long N = 10;
+		DataStream<Long> stream = env.generateSequence(1, N);
+
+		long i = 1;
+		for(Iterator it = DataStreamUtils.collect(stream); it.hasNext(); ) {
+			Long x = (Long) it.next();
+			if(x != i) {
+				Assert.fail(String.format("Should have got %d, got %d instead.", i, x));
+			}
+			i++;
+		}
+		if(i != N + 1) {
+			Assert.fail(String.format("Should have collected %d numbers, got %d instead.", N, i - 1));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/pom.xml b/flink-contrib/flink-tweet-inputformat/pom.xml
new file mode 100644
index 0000000..6762a0b
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-contrib-parent</artifactId>
+		<version>0.9-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-tweet-inputformat</artifactId>
+	<name>flink-tweet-inputformat</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.googlecode.json-simple</groupId>
+			<artifactId>json-simple</artifactId>
+			<version>1.1.1</version>
+		</dependency>
+	</dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
new file mode 100644
index 0000000..a72fc14
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+	private transient JSONParser parser;
+	private transient TweetHandler handler;
+
+	@Override
+	public void open(FileInputSplit split) throws IOException {
+		super.open(split);
+		this.handler = new TweetHandler();
+		this.parser = new JSONParser();
+	}
+
+	@Override
+	public Tweet nextRecord(Tweet record) throws IOException {
+		Boolean result = false;
+
+		do {
+			try {
+				record.reset(0);
+				record = super.nextRecord(record);
+				result = true;
+
+			} catch (JsonParseException e) {
+				result = false;
+
+			}
+		} while (!result);
+
+		return record;
+	}
+
+	@Override
+	public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+
+		InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
+		jsonReader.skip(offset);
+
+		try {
+
+			handler.reuse = reuse;
+			parser.parse(jsonReader, handler, false);
+		} catch (ParseException e) {
+
+			LOG.debug(" Tweet Parsing Exception : " + e.getMessage());
+		}
+
+		return reuse;
+	}
+
+	@Override
+	public TypeInformation<Tweet> getProducedType() {
+		return new GenericTypeInfo<Tweet>(Tweet.class);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
new file mode 100644
index 0000000..2b9f389
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Contributors;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.json.simple.parser.ContentHandler;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+public class TweetHandler implements ContentHandler {
+
+	private static final Logger logger = LoggerFactory.getLogger(TweetHandler.class);
+
+	protected Tweet reuse;
+
+	private int nesting = 0;
+
+	private ObjectState objectState = ObjectState.TWEET;
+
+	private EntryState entryState = EntryState.UNEXPECTED;
+
+	private boolean sameHashTag = false;
+
+	// to handle the coordinates special case of nesting primitive types
+	private int coordinatesCounter = 0;
+
+	private double coordinatesTemp = 0.0d;
+
+
+	@Override
+	public void startJSON() throws ParseException, IOException {
+		sameHashTag = true;
+
+	}
+
+	@Override
+	public void endJSON() throws ParseException, IOException {
+
+
+	}
+
+	@Override
+	public boolean startObject() throws ParseException, IOException {
+
+		nesting++;
+		return true;
+	}
+
+	@Override
+	public boolean endObject() throws ParseException, IOException {
+
+		nesting--;
+
+		if (this.nesting == 1) {
+			this.objectState = ObjectState.TWEET;
+		}
+
+		// The handler in JSONParser checks for the "!contentHandler.endObject()", so we should
+		// return false if its not the end of the object.
+		return nesting > 0;
+	}
+
+	@Override
+	public boolean startObjectEntry(String key) throws ParseException, IOException {
+
+		if ((key.equals("contributors") || key.equals("user") || key.equals("geo") || key.equals("place") || key.equals("attributes") || key.equals("bounding_box"))) {
+			objectState = ObjectState.valueOf(key.toUpperCase());
+		} else if (key.equals("hashtags") && nesting == 2) {
+			objectState = ObjectState.valueOf(key.toUpperCase());
+		} else if (key.equals("coordinates") && (this.nesting == 1)) {
+			objectState = ObjectState.valueOf(key.toUpperCase());
+		} else {
+			try {
+				entryState = EntryState.valueOf(key.toUpperCase());
+			} catch (IllegalArgumentException e) {
+
+				logger.debug(e.getMessage());
+
+			}
+		}
+
+		return true;
+	}
+
+	@Override
+	public boolean endObjectEntry() throws ParseException, IOException {
+
+		if (objectState == ObjectState.CONTRIBUTORS && nesting == 1) {
+			objectState = ObjectState.TWEET;
+		}
+
+		return true;
+	}
+
+	@Override
+	public boolean startArray() throws ParseException, IOException {
+
+		return true;
+	}
+
+	@Override
+	public boolean endArray() throws ParseException, IOException {
+
+		if (objectState == ObjectState.COORDINATES) {
+			coordinatesCounter = 0;
+			coordinatesTemp = 0.0d;
+		}
+
+
+		// Some tweets have HashTags twice, this condition to read only one of them
+		if (objectState == ObjectState.HASHTAGS && entryState == EntryState.INDICES && nesting == 2) {
+			sameHashTag = false;
+		}
+		return true;
+	}
+
+	@Override
+	public boolean primitive(Object value) throws ParseException, IOException {
+
+		try {
+
+			if (objectState == ObjectState.TWEET) {
+				tweetObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.USER) {
+				userObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.GEO) {
+
+				return true;
+
+			} else if (objectState == ObjectState.COORDINATES) {
+
+				coordinatesObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.PLACE) {
+
+				placeObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.GEO) {
+
+				return true;
+
+			} else if (objectState == ObjectState.ATTRIBUTES) {
+				placeAttributesObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.CONTRIBUTORS) {
+				contributorsObjectStatePrimitiveHandler(value);
+			} else if (objectState == ObjectState.HASHTAGS && entryState == EntryState.TEXT && sameHashTag) {
+				hashTagsObjectStatePrimitiveHandler(value);
+			}
+		} catch (Exception e) {
+			logger.debug("Error in primitive type:  " + e.getMessage());
+		}
+
+
+		return true;
+	}
+
+	public void tweetObjectStatePrimitiveHandler(Object value) {
+
+		switch (entryState) {
+			case CREATED_AT:
+				if (value != null) {
+					reuse.setCreated_at((String) value);
+				}
+				break;
+			case TEXT:
+				if (value != null) {
+					reuse.setText((String) value);
+				}
+				break;
+			case ID:
+				if (value != null) {
+					reuse.setId((Long) value);
+				}
+				break;
+			case ID_STR:
+				if (value != null) {
+					reuse.setId_str((String) value);
+				}
+				break;
+			case SOURCE:
+				if (value != null) {
+					reuse.setSource((String) value);
+				}
+				break;
+			case TRUNCATED:
+				if (value != null) {
+					reuse.setTruncated((Boolean) value);
+				}
+				break;
+			case IN_REPLY_TO_STATUS_ID:
+				if (value != null) {
+					reuse.setIn_reply_to_status_id((Long) value);
+				}
+				break;
+			case IN_REPLY_TO_STATUS_ID_STR:
+				if (value != null) {
+					reuse.setIn_reply_to_status_id_str((String) value);
+				}
+				break;
+			case IN_REPLY_TO_USER_ID:
+				if (value != null) {
+					reuse.setIn_reply_to_user_id((Long) value);
+				}
+				break;
+			case IN_REPLY_TO_USER_ID_STR:
+				if (value != null) {
+					reuse.setIn_reply_to_user_id_str((String) value);
+				}
+				break;
+			case IN_REPLY_TO_SCREEN_NAME:
+				if (value != null) {
+					reuse.setIn_reply_to_screen_name((String) value);
+				}
+				break;
+			case RETWEET_COUNT:
+				if (value != null) {
+					reuse.setRetweet_count((Long) value);
+				}
+				break;
+			case FAVORITE_COUNT:
+				if (value != null) {
+					reuse.setFavorite_count((Long) value);
+				}
+				break;
+			case FAVORITED:
+				if (value != null) {
+					reuse.setFavorited((Boolean) value);
+				}
+				break;
+			case RETWEETED:
+				if (value != null) {
+					reuse.setRetweeted((Boolean) value);
+				}
+				break;
+			case POSSIBLY_SENSITIVE:
+				if (value != null) {
+					reuse.setPossibly_sensitive((Boolean) value);
+				}
+				break;
+			case FILTER_LEVEL:
+				if (value != null) {
+					reuse.setFilter_level((String) value);
+				}
+				break;
+			case LANG:
+				if (value != null) {
+					reuse.setLang((String) value);
+				}
+				break;
+		}
+	}
+
+	public void userObjectStatePrimitiveHandler(Object value) {
+
+		switch (entryState) {
+			case ID:
+				if (value != null) {
+					// handle format exception caused by wrong values in the "id" field in the
+					// tweets.
+					if (value instanceof String) {
+						try {
+							reuse.getUser().setId(Long.parseLong((String) value));
+						} catch (NumberFormatException e) {
+							reuse.getUser().setId(0L);
+							logger.debug("This Tweet_ID is not a numeric type : " + (String) value);
+						}
+					} else {
+						reuse.getUser().setId((Long) value);
+					}
+				}
+				break;
+			case ID_STR:
+				if (value != null) {
+					reuse.getUser().setId_str((String) value);
+				}
+				break;
+			case NAME:
+				if (value != null) {
+					reuse.getUser().setName((String) value);
+				}
+				break;
+			case SCREEN_NAME:
+				if (value != null) {
+					reuse.getUser().setScreen_name((String) value);
+				}
+				break;
+			case LOCATION:
+				if (value != null) {
+					reuse.getUser().setLocation((String) value);
+				}
+				break;
+			case URL:
+				if (value != null) {
+					reuse.getUser().setUrl((String) value);
+				}
+				break;
+			case DESCRIPTION:
+				if (value != null) {
+					reuse.getUser().setDescription((String) value);
+				}
+				break;
+			case PROTECTED:
+				if (value != null) {
+					reuse.getUser().setProtected_tweet((Boolean) value);
+				}
+				break;
+			case VERIFIED:
+				if (value != null) {
+					reuse.getUser().setVerified((Boolean) value);
+				}
+				break;
+			case FOLLOWERS_COUNT:
+				if (value != null) {
+					reuse.getUser().setFollowers_count((Long) value);
+				}
+				break;
+			case FRIENDS_COUNT:
+				if (value != null) {
+					reuse.getUser().setFriends_count((Long) value);
+				}
+				break;
+			case LISTED_COUNT:
+				if (value != null) {
+					reuse.getUser().setListed_count((Long) value);
+				}
+				break;
+			case FAVOURITES_COUNT:
+				if (value != null) {
+					reuse.getUser().setFavourites_count((Long) value);
+				}
+				break;
+			case STATUSES_COUNT:
+				if (value != null) {
+					reuse.getUser().setStatuses_count((Long) value);
+				}
+				break;
+			case CREATED_AT:
+				if (value != null) {
+					reuse.getUser().setCreated_at((String) value);
+				}
+				break;
+			case UTC_OFFSET:
+				if (value != null) {
+					reuse.getUser().setUtc_offset((Long) value);
+				}
+				break;
+			case TIME_ZONE:
+				if (value != null) {
+					reuse.getUser().setTime_zone((String) value);
+				}
+				break;
+			case GEO_ENABLED:
+				if (value != null) {
+					reuse.getUser().setGeo_enabled((Boolean) value);
+				}
+				break;
+			case LANG:
+				if (value != null) {
+					reuse.getUser().setLang((String) value);
+				}
+				break;
+			case CONTRIBUTORS_ENABLED:
+				if (value != null) {
+					reuse.getUser().setContributors_enabled((Boolean) value);
+				}
+				break;
+			case IS_TRANSLATOR:
+				if (value != null) {
+					reuse.getUser().setIs_translator((Boolean) value);
+				}
+				break;
+			case PROFILE_BACKGROUND_COLOR:
+				if (value != null) {
+					reuse.getUser().setProfile_background_color((String) value);
+				}
+				break;
+			case PROFILE_BACKGROUND_IMAGE_URL:
+				if (value != null) {
+					reuse.getUser().setProfile_background_image_url((String) value);
+				}
+				break;
+			case PROFILE_BACKGROUND_IMAGE_URL_HTTPS:
+				if (value != null) {
+					reuse.getUser().setProfile_background_image_url_https((String) value);
+				}
+				break;
+			case PROFILE_BACKGROUND_TILE:
+				if (value != null) {
+					reuse.getUser().setProfile_background_tile((Boolean) value);
+				}
+				break;
+			case PROFILE_LINK_COLOR:
+				if (value != null) {
+					reuse.getUser().setProfile_link_color((String) value);
+				}
+				break;
+			case PROFILE_SIDEBAR_BORDER_COLOR:
+				if (value != null) {
+					reuse.getUser().setProfile_sidebar_border_color((String) value);
+				}
+				break;
+			case PROFILE_SIDEBAR_FILL_COLOR:
+				if (value != null) {
+					reuse.getUser().setProfile_sidebar_fill_color((String) value);
+				}
+				break;
+			case PROFILE_TEXT_COLOR:
+				if (value != null) {
+					reuse.getUser().setProfile_text_color((String) value);
+				}
+				break;
+			case PROFILE_USE_BACKGROUND_IMAGE:
+				if (value != null) {
+					reuse.getUser().setProfile_use_background_image((Boolean) value);
+				}
+				break;
+			case PROFILE_IMAGE_URL:
+				if (value != null) {
+					reuse.getUser().setProfile_image_url((String) value);
+				}
+				break;
+			case PROFILE_IMAGE_URL_HTTPS:
+				if (value != null) {
+					reuse.getUser().setProfile_image_url_https((String) value);
+				}
+				break;
+			case PROFILE_BANNER_URL:
+				if (value != null) {
+					reuse.getUser().setProfile_banner_url((String) value);
+				}
+				break;
+			case DEFAULT_PROFILE:
+				if (value != null) {
+					reuse.getUser().setDefault_profile((Boolean) value);
+				}
+				break;
+			case DEFAULT_PROFILE_IMAGE:
+				if (value != null) {
+					reuse.getUser().setDefault_profile_image((Boolean) value);
+				}
+				break;
+			case FOLLOWING:
+				if (value != null) {
+					reuse.getUser().setFollowing((Boolean) value);
+				}
+				break;
+			case FOLLOW_REQUEST_SENT:
+				if (value != null) {
+					reuse.getUser().setFollow_request_sent((Boolean) value);
+				}
+				break;
+			case NOTIFICATIONS:
+				if (value != null) {
+					reuse.getUser().setNotifications((Boolean) value);
+				}
+				break;
+		}
+	}
+
+	public void coordinatesObjectStatePrimitiveHandler(Object value) {
+
+		switch (entryState) {
+			case COORDINATES:
+				if (value != null && this.coordinatesCounter == 0) {
+					coordinatesTemp = (Double) value;
+					this.coordinatesCounter++;
+				} else if (value != null && this.coordinatesCounter == 1) {
+					reuse.getCoordinates().setCoordinates(coordinatesTemp, (Double) value);
+				} else {
+					reuse.getCoordinates().setCoordinates(0.0d, 0.0d);
+				}
+				break;
+		}
+
+	}
+
+	public void placeObjectStatePrimitiveHandler(Object value) {
+
+		switch (entryState) {
+			case ID:
+				if (value != null) {
+					reuse.getPlace().setId((String) value);
+				}
+				break;
+			case URL:
+				if (value != null) {
+					reuse.getPlace().setUrl((String) value);
+				}
+				break;
+			case PLACE_TYPE:
+				if (value != null) {
+					reuse.getPlace().setPlace_type((String) value);
+				}
+				break;
+			case NAME:
+				if (value != null) {
+					reuse.getPlace().setName((String) value);
+				}
+				break;
+			case FULL_NAME:
+				if (value != null) {
+					reuse.getPlace().setFull_name((String) value);
+				}
+				break;
+			case COUNTRY_CODE:
+				if (value != null) {
+					reuse.getPlace().setCountry_code((String) value);
+				}
+				break;
+			case COUNTRY:
+				if (value != null) {
+					reuse.getPlace().setCountry((String) value);
+				}
+				break;
+
+			// Skipped BoundingBox -- Not Required
+
+
+		}
+	}
+
+	public void placeAttributesObjectStatePrimitiveHandler(Object value) {
+
+		switch (entryState) {
+			case STREET_ADDRESS:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setStreet_address((String) value);
+				}
+				break;
+			case LOCALITY:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setLocality((String) value);
+				}
+				break;
+			case REGION:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setRegion((String) value);
+				}
+				break;
+			case ISO3:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setIso3((String) value);
+				}
+				break;
+			case POSTAL_CODE:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setPostal_code((String) value);
+				}
+				break;
+			case PHONE:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setPhone((String) value);
+				}
+				break;
+			case URL:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setUrl((String) value);
+				}
+				break;
+			case APP_ID:
+				if (value != null) {
+					reuse.getPlace().getAttributes().setAppId((String) value);
+				}
+				break;
+			// Skipped BoundingBox -- Not Required
+
+		}
+
+
+	}
+
+	public void contributorsObjectStatePrimitiveHandler(Object value) {
+
+		// to handle the case of the null as contributors is an array in the Twitter documentation
+		// && if it is not null we initialize the object and fill it with the data,
+		if (value == null) {
+			reuse.getContributors().add(new Contributors());
+		} else {
+
+			Contributors contributor = new Contributors();
+
+			switch (entryState) {
+				case ID:
+					if (value != null) {
+						contributor.setId((Long) value);
+					}
+					break;
+				case ID_STR:
+					if (value != null) {
+						contributor.setId_str((String) value);
+					}
+					break;
+				case TWEET_CONTRIBUTORS_SCREEN_NAME:
+					if (value != null) {
+						contributor.setScreenName((String) value);
+					}
+					break;
+			}
+			reuse.getContributors().add(contributor);
+
+		}
+
+
+	}
+
+	public void hashTagsObjectStatePrimitiveHandler(Object value) {
+
+		HashTags hashTag = new HashTags();
+
+		if (value == null) {
+			return;
+		} else if (entryState == EntryState.TEXT && value != null) {
+			hashTag.setText((String) value, false);
+			reuse.getEntities().getHashtags().add(hashTag);
+		}
+	}
+
+	private static enum ObjectState {
+		TWEET,
+		CONTRIBUTORS,
+		USER,
+		GEO,
+		COORDINATES,
+		PLACE,
+		ATTRIBUTES,
+		BOUNDING_BOX,
+		HASHTAGS;
+
+	}
+
+	private static enum EntryState {
+		TEXT,
+		CREATED_AT,
+		ID,
+		ID_STR,
+		SOURCE,
+		TRUNCATED,
+		IN_REPLY_TO_STATUS_ID,
+		IN_REPLY_TO_STATUS_ID_STR,
+		IN_REPLY_TO_USER_ID,
+		IN_REPLY_TO_USER_ID_STR,
+		IN_REPLY_TO_SCREEN_NAME,
+		RETWEET_COUNT,
+		FAVORITE_COUNT,
+		FAVORITED,
+		RETWEETED,
+		POSSIBLY_SENSITIVE,
+		FILTER_LEVEL,
+		TWEET_CONTRIBUTORS_SCREEN_NAME,
+		SCREEN_NAME,
+		LOCATION,
+		DESCRIPTION,
+		PROTECTED,
+		VERIFIED,
+		FOLLOWERS_COUNT,
+		FRIENDS_COUNT,
+		LISTED_COUNT,
+		FAVOURITES_COUNT,
+		STATUSES_COUNT,
+		UTC_OFFSET,
+		TIME_ZONE,
+		GEO_ENABLED,
+		LANG,
+		CONTRIBUTORS_ENABLED,
+		IS_TRANSLATOR,
+		PROFILE_BACKGROUND_COLOR,
+		PROFILE_BACKGROUND_IMAGE_URL,
+		PROFILE_BACKGROUND_IMAGE_URL_HTTPS,
+		PROFILE_BACKGROUND_TILE,
+		PROFILE_LINK_COLOR,
+		PROFILE_SIDEBAR_BORDER_COLOR,
+		PROFILE_SIDEBAR_FILL_COLOR,
+		PROFILE_TEXT_COLOR,
+		PROFILE_USE_BACKGROUND_IMAGE,
+		PROFILE_IMAGE_URL,
+		PROFILE_IMAGE_URL_HTTPS,
+		PROFILE_BANNER_URL,
+		DEFAULT_PROFILE,
+		DEFAULT_PROFILE_IMAGE,
+		FOLLOWING,
+		FOLLOW_REQUEST_SENT,
+		NOTIFICATIONS,
+		TYPE,
+		COORDINATES,
+		PLACE_TYPE,
+		NAME,
+		FULL_NAME,
+		COUNTRY_CODE,
+		COUNTRY,
+		BOUNDING_BOX,
+		ATTRIBUTES,
+		STREET_ADDRESS,
+		LOCALITY,
+		REGION,
+		ISO3,
+		POSTAL_CODE,
+		PHONE,
+		URL,
+		ENTITIES,
+		HASHTAGS,
+		TRENDS,
+		URLS,
+		USER_MENTIONS,
+		SYMBOLS,
+		MEDIA,
+		INDICES,
+		MEDIA_URL,
+		MEDIA_URL_HTTPS,
+		DISPLAY_URL,
+		EXPANDED_URL,
+		SIZES,
+		LARGE,
+		W,
+		H,
+		RESIZE,
+		SMALL,
+		THUMB,
+		MEDIUM,
+		RETWEETED_STATUS,
+		SOURCE_STATUS_ID,
+		SOURCE_STATUS_ID_STR,
+		SCOPES,
+		FOLLOWERS,
+		APP_ID,
+		UNEXPECTED;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
new file mode 100755
index 0000000..839b54f
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.User;
+
+
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * {@link Users} can be anyone or anything. They {@link  org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}
+ * , follow, create lists, have a home_timeline, can be mentioned, and can be looked up in bulk.
+ */
+public class Users {
+
+
+	private boolean contributors_enabled;
+
+	private String created_at = "";
+
+	private boolean default_profile;
+
+	private boolean default_profile_image;
+
+	private String description = "";
+
+	private Entities entities;
+
+	private long favourites_count;
+
+	private boolean follow_request_sent;
+
+	private boolean following;
+
+	private long followers_count;
+
+	private long friends_count;
+
+	private boolean geo_enabled;
+
+	private long id;
+
+	private String id_str = "";
+
+	private boolean is_translator;
+
+	private String lang = "";
+
+	private long listed_count;
+
+	private String location = "";
+
+	private String name = "";
+
+	private boolean notifications;
+
+	private String profile_background_color = "";
+
+	private String profile_background_image_url = "";
+
+	private String profile_background_image_url_https = "";
+
+	private boolean profile_background_tile;
+
+	private String profile_banner_url = "";
+
+	private String profile_image_url = "";
+
+	private String profile_image_url_https = "";
+
+	private String profile_link_color = "";
+
+	private String profile_sidebar_border_color = "";
+
+	private String profile_sidebar_fill_color = "";
+
+	private String profile_text_color = "";
+
+	private boolean profile_use_background_image;
+
+	private boolean protected_tweet;
+
+	private String screen_name = "";
+
+	private long statuses_count;
+
+	private String time_zone = "";
+
+	private String url = "";
+
+	private long utc_offset;
+
+	private boolean verified;
+
+	public Users() {
+		reset();
+	}
+
+	// to avoid FLINK KRYO serializer problem
+	public void reset() {
+
+		contributors_enabled = false;
+		created_at = "";
+		default_profile = false;
+		default_profile_image = false;
+		description = "";
+		entities = new Entities();
+		favourites_count = 0L;
+		follow_request_sent = false;
+		following = false;
+		followers_count = 0L;
+		friends_count = 0L;
+		geo_enabled = false;
+		id = 0L;
+		id_str = "";
+		is_translator = false;
+		lang = "";
+		listed_count = 0L;
+		location = "";
+		name = "";
+		notifications = false;
+		profile_background_color = "";
+		profile_background_image_url = "";
+		profile_background_image_url_https = "";
+		profile_background_tile = false;
+		profile_banner_url = "";
+		profile_image_url = "";
+		profile_image_url_https = "";
+		profile_link_color = "";
+		profile_sidebar_border_color = "";
+		profile_sidebar_fill_color = "";
+		profile_text_color = "";
+		profile_use_background_image = false;
+		protected_tweet = false;
+		screen_name = "";
+		statuses_count = 0L;
+		time_zone = "";
+		url = "";
+		utc_offset = 0L;
+		verified = false;
+
+	}
+
+	private String getUTCTime() {
+
+		Date date = new Date();
+		SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
+		return format.format(date);
+
+	}
+
+	public boolean isContributors_enabled() {
+		return contributors_enabled;
+	}
+
+	public void setContributors_enabled(boolean contributors_enabled) {
+		this.contributors_enabled = contributors_enabled;
+	}
+
+	public String getCreated_at() {
+		return created_at;
+	}
+
+	public void setCreated_at(String created_at) {
+		this.created_at = created_at;
+	}
+
+	public boolean isDefault_profile() {
+		return default_profile;
+	}
+
+	public void setDefault_profile(boolean default_profile) {
+		this.default_profile = default_profile;
+	}
+
+	public boolean isDefault_profile_image() {
+		return default_profile_image;
+	}
+
+	public void setDefault_profile_image(boolean default_profile_image) {
+		this.default_profile_image = default_profile_image;
+	}
+
+	public String getDescription() {
+		return description;
+	}
+
+	public void setDescription(String description) {
+		this.description = description;
+	}
+
+	public Entities getEntities() {
+		return entities;
+	}
+
+	public void setEntities(Entities entities) {
+		this.entities = entities;
+	}
+
+	public long getFavourites_count() {
+		return favourites_count;
+	}
+
+	public void setFavourites_count(long favourites_count) {
+		this.favourites_count = favourites_count;
+	}
+
+	public boolean isFollow_request_sent() {
+		return follow_request_sent;
+	}
+
+	public void setFollow_request_sent(boolean follow_request_sent) {
+		this.follow_request_sent = follow_request_sent;
+	}
+
+	public boolean isFollowing() {
+		return following;
+	}
+
+	public void setFollowing(boolean following) {
+		this.following = following;
+	}
+
+	public long getFollowers_count() {
+		return followers_count;
+	}
+
+	public void setFollowers_count(long followers_count) {
+		this.followers_count = followers_count;
+	}
+
+	public long getFriends_count() {
+		return friends_count;
+	}
+
+	public void setFriends_count(long friends_count) {
+		this.friends_count = friends_count;
+	}
+
+	public boolean isGeo_enabled() {
+		return geo_enabled;
+	}
+
+	public void setGeo_enabled(boolean geo_enabled) {
+		this.geo_enabled = geo_enabled;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getId_str() {
+		return Long.toString(id);
+	}
+
+	public void setId_str(String id_str) {
+		this.id_str = id_str;
+	}
+
+	public boolean isIs_translator() {
+		return is_translator;
+	}
+
+	public void setIs_translator(boolean is_translator) {
+		this.is_translator = is_translator;
+	}
+
+	public String getLang() {
+		return lang;
+	}
+
+	public void setLang(String lang) {
+		this.lang = lang;
+	}
+
+	public long getListed_count() {
+		return listed_count;
+	}
+
+	public void setListed_count(long listed_count) {
+		this.listed_count = listed_count;
+	}
+
+	public String getLocation() {
+		return location;
+	}
+
+	public void setLocation(String location) {
+		this.location = location;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public boolean isNotifications() {
+		return notifications;
+	}
+
+	public void setNotifications(boolean notifications) {
+		this.notifications = notifications;
+	}
+
+	public String getProfile_background_color() {
+		return profile_background_color;
+	}
+
+	public void setProfile_background_color(String profile_background_color) {
+		this.profile_background_color = profile_background_color;
+	}
+
+	public String getProfile_background_image_url() {
+		return profile_background_image_url;
+	}
+
+	public void setProfile_background_image_url(String profile_background_image_url) {
+		this.profile_background_image_url = profile_background_image_url;
+	}
+
+	public String getProfile_background_image_url_https() {
+		return profile_background_image_url_https;
+	}
+
+	public void setProfile_background_image_url_https(String profile_background_image_url_https) {
+		this.profile_background_image_url_https = profile_background_image_url_https;
+	}
+
+	public boolean isProfile_background_tile() {
+		return profile_background_tile;
+	}
+
+	public void setProfile_background_tile(boolean profile_background_tile) {
+		this.profile_background_tile = profile_background_tile;
+	}
+
+	public String getProfile_banner_url() {
+		return profile_banner_url;
+	}
+
+	public void setProfile_banner_url(String profile_banner_url) {
+		this.profile_banner_url = profile_banner_url;
+	}
+
+	public String getProfile_image_url() {
+		return profile_image_url;
+	}
+
+	public void setProfile_image_url(String profile_image_url) {
+		this.profile_image_url = profile_image_url;
+	}
+
+	public String getProfile_image_url_https() {
+		return profile_image_url_https;
+	}
+
+	public void setProfile_image_url_https(String profile_image_url_https) {
+		this.profile_image_url_https = profile_image_url_https;
+	}
+
+	public String getProfile_link_color() {
+		return profile_link_color;
+	}
+
+	public void setProfile_link_color(String profile_link_color) {
+		this.profile_link_color = profile_link_color;
+	}
+
+	public String getProfile_sidebar_border_color() {
+		return profile_sidebar_border_color;
+	}
+
+	public void setProfile_sidebar_border_color(String profile_sidebar_border_color) {
+		this.profile_sidebar_border_color = profile_sidebar_border_color;
+	}
+
+	public String getProfile_sidebar_fill_color() {
+		return profile_sidebar_fill_color;
+	}
+
+	public void setProfile_sidebar_fill_color(String profile_sidebar_fill_color) {
+		this.profile_sidebar_fill_color = profile_sidebar_fill_color;
+	}
+
+	public String getProfile_text_color() {
+		return profile_text_color;
+	}
+
+	public void setProfile_text_color(String profile_text_color) {
+		this.profile_text_color = profile_text_color;
+	}
+
+	public boolean isProfile_use_background_image() {
+		return profile_use_background_image;
+	}
+
+	public void setProfile_use_background_image(boolean profile_use_background_image) {
+		this.profile_use_background_image = profile_use_background_image;
+	}
+
+	public boolean isProtected_tweet() {
+		return protected_tweet;
+	}
+
+	public void setProtected_tweet(boolean protected_tweet) {
+		this.protected_tweet = protected_tweet;
+	}
+
+	public String getScreen_name() {
+		return screen_name;
+	}
+
+	public void setScreen_name(String screen_name) {
+		this.screen_name = screen_name;
+	}
+
+	public long getStatuses_count() {
+		return statuses_count;
+	}
+
+	public void setStatuses_count(long statuses_count) {
+		this.statuses_count = statuses_count;
+	}
+
+	public String getTime_zone() {
+		return time_zone;
+	}
+
+	public void setTime_zone(String time_zone) {
+		this.time_zone = time_zone;
+	}
+
+	public String getUrl() {
+		return url;
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+
+	public long getUtc_offset() {
+		return utc_offset;
+	}
+
+	public void setUtc_offset(long utc_offset) {
+		this.utc_offset = utc_offset;
+	}
+
+	public boolean isVerified() {
+		return verified;
+	}
+
+	public void setVerified(boolean verified) {
+		this.verified = verified;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
new file mode 100755
index 0000000..7cf9695
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+public class Attributes {
+
+	private String street_address = "";
+
+	private String locality = "";
+
+	private String region = "";
+
+	private String iso3 = "";
+
+	private String postal_code = "";
+
+	private String phone = "";
+
+	private String twitter = "twitter";
+
+	private String url = "";
+
+	// in the API it is app:id !!
+	private String appId = "";
+
+	public Attributes() {
+
+	}
+
+
+	public String getStreet_address() {
+		return street_address;
+	}
+
+	public void setStreet_address(String street_address) {
+		this.street_address = street_address;
+	}
+
+	public String getLocality() {
+		return locality;
+	}
+
+	public void setLocality(String locality) {
+		this.locality = locality;
+	}
+
+	public String getRegion() {
+		return region;
+	}
+
+	public void setRegion(String region) {
+		this.region = region;
+	}
+
+	public String getIso3() {
+		return iso3;
+	}
+
+	public void setIso3(String iso3) {
+		this.iso3 = iso3;
+	}
+
+	public String getPostal_code() {
+		return postal_code;
+	}
+
+	public void setPostal_code(String postal_code) {
+		this.postal_code = postal_code;
+	}
+
+	public String getPhone() {
+		return phone;
+	}
+
+	public void setPhone(String phone) {
+		this.phone = phone;
+	}
+
+	public String getTwitter() {
+		return twitter;
+	}
+
+	public String getUrl() {
+		return url;
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+
+	public String getAppId() {
+		return appId;
+	}
+
+	public void setAppId(String appId) {
+		this.appId = appId;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
new file mode 100755
index 0000000..e885028
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A series of longitude and latitude points, defining a box which will contain the Place entity
+ * this bounding box is related to. Each point is an array in the form of [longitude, latitude].
+ * Points are grouped into an array per bounding box. Bounding box arrays are wrapped in one
+ * additional array to be compatible with the polygon notation.
+ */
+public class BoundingBox {
+
+	private List<List<double[]>> coordinates = new ArrayList<List<double[]>>();
+
+	private String type = "Polygon";
+
+	public BoundingBox() {
+
+	}
+
+	public BoundingBox(List<double[]> points) {
+
+		this.coordinates.add(points);
+
+	}
+
+	public List<List<double[]>> getCoordinates() {
+		return coordinates;
+	}
+
+	public void setCoordinates(List<List<double[]>> coordinates) {
+		this.coordinates = coordinates;
+	}
+
+	public String getType() {
+		return type;
+	}
+
+	public void setType(String type) {
+		this.type = type;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
new file mode 100755
index 0000000..723644c
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+/**
+ * {@link org.apache.flink.contrib.tweetinputformat.model.places.Places} are specific, named locations with
+ * corresponding geo {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Coordinates}. They can be attached
+ * to {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} by specifying a place_id when tweeting. <br>
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} associated with places are not necessarily
+ * issued from that location but could also potentially be about that location.<br>
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} can be searched for. Tweets can also be found
+ * by place_id.
+ */
+public class Places {
+
+
+	private Attributes attributes;
+
+	private BoundingBox bounding_box;
+
+	private String country = "";
+
+	private String country_code = "";
+
+	private String full_name = "";
+
+	private String id = "";
+
+	private String name = "";
+
+	private String place_type = "";
+
+	private String url = "";
+
+
+	public Places() {
+		attributes = new Attributes();
+		bounding_box = new BoundingBox();
+
+	}
+
+	public Attributes getAttributes() {
+		return attributes;
+	}
+
+	public void setAttributes(Attributes attributes) {
+		this.attributes = attributes;
+	}
+
+	public BoundingBox getBounding_box() {
+		return bounding_box;
+	}
+
+	public void setBounding_box(BoundingBox bounding_box) {
+		this.bounding_box = bounding_box;
+	}
+
+	public String getCountry() {
+		return country;
+	}
+
+	public void setCountry(String country) {
+		this.country = country;
+	}
+
+	public String getCountry_code() {
+		return country_code;
+	}
+
+	public void setCountry_code(String country_code) {
+		this.country_code = country_code;
+	}
+
+	public String getFull_name() {
+		return full_name;
+	}
+
+	public void setFull_name(String full_name) {
+		this.full_name = full_name;
+	}
+
+	public String getId() {
+		return id;
+	}
+
+	public void setId(String id) {
+		this.id = id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public String getPlace_type() {
+		return place_type;
+	}
+
+	public void setPlace_type(String place_type) {
+		this.place_type = place_type;
+	}
+
+	public String getUrl() {
+		return url;
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
new file mode 100755
index 0000000..85968eb
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Nullable. An collection of brief user objects (usually only one) indicating users who contributed
+ * to the authorship of the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} on behalf of the
+ * official tweet author.
+ */
+public class Contributors {
+
+
+	private Long id = 0L;
+
+	private String id_str = "";
+
+	private String screenName = "";
+
+	public Contributors() {
+		reset();
+	}
+
+	public Contributors(long id, String id_str, String screenName) {
+
+		this.id = id;
+		this.id_str = id_str;
+		this.screenName = screenName;
+	}
+
+	public void reset() {
+
+		id = 0L;
+		id_str = "";
+		screenName = "";
+
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getId_str() {
+		return id_str;
+	}
+
+	public void setId_str(String id_str) {
+		this.id_str = id_str;
+	}
+
+	public String getScreenName() {
+		return screenName;
+	}
+
+	public void setScreenName(String screenName) {
+		this.screenName = screenName;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
new file mode 100755
index 0000000..c697e98
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Nullable. Represents the geographic location of this
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} as reported by the user or client
+ * application. The inner coordinates array is formatted as geoJSON longitude first, then latitude)
+ */
+public class Coordinates {
+
+	private String type = "point";
+
+	private double[] coordinates = new double[2];
+
+	public Coordinates() {
+
+	}
+
+	public double[] getCoordinates() {
+		return coordinates;
+	}
+
+	public void setCoordinates(double[] coordinates) {
+		this.coordinates = coordinates;
+	}
+
+	public void setCoordinates(double longitude, double latitude) {
+		this.coordinates[0] = longitude;
+		this.coordinates[1] = latitude;
+	}
+
+	public String getType() {
+		return type;
+	}
+
+	@Override
+	public String toString() {
+		return "longitude = " + this.coordinates[0] + "  latitude = " + this.coordinates[1];
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
new file mode 100755
index 0000000..f5a43a2
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Details the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} ID of the user’s own retweet (if
+ * existent) of this {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class CurrentUserRetweet {
+
+	private long id;
+
+	private String id_str = "";
+
+	public CurrentUserRetweet() {
+		reset();
+	}
+
+	public void reset() {
+		id = 0L;
+		id_str = "";
+
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getId_str() {
+		return id_str;
+	}
+
+	public void setId_str() {
+		this.id_str = Long.toString(id);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
new file mode 100755
index 0000000..a2986a3
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+import org.apache.flink.contrib.tweetinputformat.model.User.Users;
+import org.apache.flink.contrib.tweetinputformat.model.places.Places;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Tweet {
+
+	private List<Contributors> contributors;
+
+	private Coordinates coordinates;
+
+	private String created_at = "";
+
+	private Entities entities;
+
+	private long favorite_count;
+
+	private boolean favorited;
+
+	private String filter_level = "";
+
+	private long id;
+
+	private String id_str = "";
+
+	private String in_reply_to_screen_name = "";
+
+	private long in_reply_to_status_id;
+
+	private String in_reply_to_status_id_str = "";
+
+	private long in_reply_to_user_id;
+
+	private String in_reply_to_user_id_str = "";
+
+	private String lang = "";
+
+	// Places
+	private Places place;
+
+	private boolean possibly_sensitive;
+
+	private long retweet_count;
+
+	private boolean retweeted;
+
+	private CurrentUserRetweet currentUserRetweet;
+
+	private String source = "";
+
+	private String text = "";
+
+	private boolean truncated;
+
+	private Users user;
+
+	// to Hanlde retweeted_status
+	private Tweet retweeted_status;
+
+	private int tweetLevel;
+
+	public Tweet() {
+		tweetLevel = 0;
+		reset(tweetLevel);
+	}
+
+	public Tweet(int level) {
+		tweetLevel = level;
+		reset(tweetLevel);
+	}
+
+
+	// to avoid FLINK KRYO serializer problem
+	public void reset(int level) {
+
+		contributors = new ArrayList<Contributors>();
+		coordinates = new Coordinates();
+		created_at = "";
+		entities = new Entities();
+		favorite_count = 0L;
+		favorited = false;
+		filter_level = "";
+		id = 0L;
+		id_str = "";
+		in_reply_to_screen_name = "";
+		in_reply_to_status_id = 0L;
+		in_reply_to_status_id_str = "";
+		in_reply_to_user_id = 0L;
+		in_reply_to_user_id_str = "";
+		lang = "";
+		place = new Places();
+		possibly_sensitive = false;
+		retweet_count = 0L;
+
+		// to Hanlde retweeted_status
+		if (level == 0) {
+			retweeted_status = new Tweet(++level);
+		}
+
+
+		currentUserRetweet = new CurrentUserRetweet();
+		retweeted = false;
+		source = "";
+		text = "";
+		truncated = false;
+		user = new Users();
+
+	}
+
+	public List<Contributors> getContributors() {
+		return contributors;
+	}
+
+	public void setContributors(List<Contributors> contributors) {
+		this.contributors = contributors;
+	}
+
+	public Coordinates getCoordinates() {
+		return coordinates;
+	}
+
+	public void setCoordinates(Coordinates coordinates) {
+		this.coordinates = coordinates;
+	}
+
+	public String getCreated_at() {
+		return created_at;
+	}
+
+	public void setCreated_at(String created_at) {
+		this.created_at = created_at;
+	}
+
+	public Entities getEntities() {
+		return entities;
+	}
+
+	public void setEntities(Entities entities) {
+		this.entities = entities;
+	}
+
+	public long getFavorite_count() {
+		return favorite_count;
+	}
+
+	public void setFavorite_count(long favorite_count) {
+		this.favorite_count = favorite_count;
+	}
+
+	public boolean isFavorited() {
+		return favorited;
+	}
+
+	public void setFavorited(boolean favorited) {
+		this.favorited = favorited;
+	}
+
+	public String getFilter_level() {
+		return filter_level;
+	}
+
+	public void setFilter_level(String filter_level) {
+		this.filter_level = filter_level;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getId_str() {
+		return id_str;
+	}
+
+	public void setId_str(String id_str) {
+		this.id_str = id_str;
+	}
+
+	public String getIn_reply_to_screen_name() {
+		return in_reply_to_screen_name;
+	}
+
+	public void setIn_reply_to_screen_name(String in_reply_to_screen_name) {
+		this.in_reply_to_screen_name = in_reply_to_screen_name;
+	}
+
+
+	public long getIn_reply_to_status_id() {
+		return in_reply_to_status_id;
+	}
+
+	public void setIn_reply_to_status_id(long in_reply_to_status_id) {
+		this.in_reply_to_status_id = in_reply_to_status_id;
+	}
+
+	public String getIn_reply_to_status_id_str() {
+		return in_reply_to_status_id_str;
+	}
+
+	public void setIn_reply_to_status_id_str(String in_reply_to_status_id_str) {
+		this.in_reply_to_status_id_str = in_reply_to_status_id_str;
+	}
+
+	public long getIn_reply_to_user_id() {
+		return in_reply_to_user_id;
+	}
+
+	public void setIn_reply_to_user_id(long in_reply_to_user_id) {
+		this.in_reply_to_user_id = in_reply_to_user_id;
+	}
+
+	public String getIn_reply_to_user_id_str() {
+		return in_reply_to_user_id_str;
+	}
+
+	public void setIn_reply_to_user_id_str(String in_reply_to_user_id_str) {
+		this.in_reply_to_user_id_str = in_reply_to_user_id_str;
+	}
+
+	public String getLang() {
+		return lang;
+	}
+
+	public void setLang(String lang) {
+		this.lang = lang;
+	}
+
+	public Places getPlace() {
+		return place;
+	}
+
+	public void setPlace(Places place) {
+		this.place = place;
+	}
+
+	public boolean getPossibly_sensitive() {
+		return possibly_sensitive;
+	}
+
+	public void setPossibly_sensitive(boolean possibly_sensitive) {
+		this.possibly_sensitive = possibly_sensitive;
+	}
+
+	public long getRetweet_count() {
+		return retweet_count;
+	}
+
+	public void setRetweet_count(long retweet_count) {
+		this.retweet_count = retweet_count;
+	}
+
+	public boolean isRetweeted() {
+		return retweeted;
+	}
+
+	public void setRetweeted(boolean retweeted) {
+		this.retweeted = retweeted;
+	}
+
+	public String getSource() {
+		return source;
+	}
+
+	public void setSource(String source) {
+		this.source = source;
+	}
+
+	public String getText() {
+		return text;
+	}
+
+	public void setText(String text) {
+		this.text = text;
+	}
+
+	public boolean isTruncated() {
+		return truncated;
+	}
+
+	public void setTruncated(boolean truncated) {
+		this.truncated = truncated;
+	}
+
+	public Users getUser() {
+		return user;
+	}
+
+	public void setUser(Users user) {
+		this.user = user;
+	}
+
+	public CurrentUserRetweet getCurrentUserRetweet() {
+		return currentUserRetweet;
+	}
+
+	public void setCurrentUserRetweet(CurrentUserRetweet currentUserRetweet) {
+		this.currentUserRetweet = currentUserRetweet;
+	}
+
+
+	public boolean isPossibly_sensitive() {
+		return possibly_sensitive;
+	}
+
+	public Tweet getRetweeted_status() {
+		return retweeted_status;
+	}
+
+	public void setRetweeted_status(Tweet retweeted_status) {
+		this.retweeted_status = retweeted_status;
+	}
+
+	public int getTweetLevel() {
+		return tweetLevel;
+	}
+
+	public void setTweetLevel(int tweetLevel) {
+		this.tweetLevel = tweetLevel;
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
new file mode 100755
index 0000000..d88ea34
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Entities which have been parsed out of the text of the
+ * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class Entities {
+
+	private List<HashTags> hashtags;
+
+	private List<Media> media;
+
+	private List<URL> urls;
+
+	private List<UserMention> user_mentions;
+
+	private List<Symbol> symbols;
+
+	public Entities() {
+
+		hashtags = new ArrayList<HashTags>();
+		media = new ArrayList<Media>();
+		urls = new ArrayList<URL>();
+		user_mentions = new ArrayList<UserMention>();
+		symbols = new ArrayList<Symbol>();
+
+	}
+
+	public List<HashTags> getHashtags() {
+		return hashtags;
+	}
+
+	public void setHashtags(List<HashTags> hashtags) {
+		this.hashtags = hashtags;
+	}
+
+	public List<Media> getMedia() {
+		return media;
+	}
+
+	public void setMedia(List<Media> media) {
+		this.media = media;
+	}
+
+	public List<URL> getUrls() {
+		return urls;
+	}
+
+	public void setUrls(List<URL> urls) {
+		this.urls = urls;
+	}
+
+	public List<UserMention> getUser_mentions() {
+		return user_mentions;
+	}
+
+	public void setUser_mentions(List<UserMention> user_mentions) {
+		this.user_mentions = user_mentions;
+	}
+
+
+	public List<Symbol> getSymbols() {
+		return symbols;
+	}
+
+	public void setSymbols(List<Symbol> symbols) {
+		this.symbols = symbols;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
new file mode 100755
index 0000000..1900859
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+/**
+ * Represents hashtags which have been parsed out of the
+ * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text.
+ */
+
+public class HashTags {
+
+	private long[] indices = new long[2];
+
+	private String text = "";
+
+
+	public long[] getIndices() {
+		return indices;
+	}
+
+	public void setIndices(long[] indices) {
+		this.indices = indices;
+	}
+
+	public void setIndices(long start, long end) {
+		this.indices[0] = start;
+		this.indices[1] = end;
+
+	}
+
+	public String getText() {
+		return text;
+	}
+
+	public void setText(String text, boolean hashExist) {
+		if (hashExist) {
+			this.text = text.substring((int) indices[0] + 1);
+		} else {
+			this.text = text;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
new file mode 100755
index 0000000..f006aac
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents media elements uploaded with the {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class Media {
+
+
+	private String display_url = "";
+
+	private String expanded_url = "";
+
+	private long id;
+
+	private String id_str = "";
+
+	private long[] indices;
+
+	private String media_url = "";
+
+	private String media_url_https = "";
+
+	private Map<String, Size> sizes;
+
+	private String type = "";
+
+	private String url = "";
+
+	public Media() {
+
+		this.display_url = "";
+		this.expanded_url = "";
+		this.id = 0L;
+		this.id_str = "";
+		this.setIndices(new long[]{0L, 0L});
+		this.media_url = "";
+		this.media_url_https = "";
+		this.sizes = new HashMap<String, Size>();
+		this.type = "";
+		this.url = "";
+
+	}
+
+	public String getDisplay_url() {
+		return display_url;
+	}
+
+	public void setDisplay_url(String display_url) {
+		this.display_url = display_url;
+	}
+
+	public String getExpanded_url() {
+		return expanded_url;
+	}
+
+	public void setExpanded_url(String expanded_url) {
+		this.expanded_url = expanded_url;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public void setId(long id) {
+		this.id = id;
+	}
+
+	public String getId_str() {
+		return id_str;
+	}
+
+	public void setId_str(String id_str) {
+		this.id_str = id_str;
+	}
+
+	public long[] getIndices() {
+		return indices;
+	}
+
+	public void setIndices(long[] indices) {
+		this.indices = indices;
+	}
+
+	public String getMedia_url() {
+		return media_url;
+	}
+
+	public void setMedia_url(String media_url) {
+		this.media_url = media_url;
+	}
+
+	public String getMedia_url_https() {
+		return media_url_https;
+	}
+
+	public void setMedia_url_https(String media_url_https) {
+		this.media_url_https = media_url_https;
+	}
+
+	public Map<String, Size> getSizes() {
+		return sizes;
+	}
+
+	public void setSizes(Map<String, Size> sizes) {
+		this.sizes = sizes;
+	}
+
+	public String getType() {
+		return type;
+	}
+
+	public void setType(String type) {
+		this.type = type;
+	}
+
+	public String getUrl() {
+		return url;
+	}
+
+	public void setUrl(String url) {
+		this.url = url;
+	}
+}