You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:12 UTC
[22/27] flink git commit: [storm-compat] Moved Storm-compatibility to
flink-contrib and split flink-contrib into small sub-projects
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
new file mode 100644
index 0000000..53b84ef
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.InetAddress;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * A specialized data sink to be used by DataStreamUtils.collect.
+ */
+class CollectSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+ private final InetAddress hostIp;
+ private final int port;
+ private final TypeSerializer<IN> serializer;
+ private transient Socket client;
+ private transient DataOutputStream dataOutputStream;
+ private StreamWriterDataOutputView streamWriter;
+
+ /**
+ * Creates a CollectSink that will send the data to the specified host.
+ *
+ * @param hostIp IP address of the Socket server.
+ * @param port Port of the Socket server.
+ * @param serializer A serializer for the data.
+ */
+ public CollectSink(InetAddress hostIp, int port, TypeSerializer<IN> serializer) {
+ this.hostIp = hostIp;
+ this.port = port;
+ this.serializer = serializer;
+ }
+
+ /**
+ * Initializes the connection to Socket.
+ */
+ public void initializeConnection() {
+ OutputStream outputStream;
+ try {
+ client = new Socket(hostIp, port);
+ outputStream = client.getOutputStream();
+ streamWriter = new StreamWriterDataOutputView(outputStream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ dataOutputStream = new DataOutputStream(outputStream);
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Socket.
+ *
+ * @param value
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN value) {
+ try {
+ serializer.serialize(value, streamWriter);
+ } catch (IOException e) {
+ if(LOG.isErrorEnabled()){
+ LOG.error("Cannot send message to socket server at " + hostIp.toString() + ":" + port, e);
+ }
+ }
+ }
+
+ /**
+ * Closes the connection of the Socket client.
+ */
+ private void closeConnection(){
+ try {
+ dataOutputStream.flush();
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error while closing connection with socket server at "
+ + hostIp.toString() + ":" + port, e);
+ } finally {
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close connection with socket server at "
+ + hostIp.toString() + ":" + port, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize the connection with the Socket in the server.
+ * @param parameters Configuration.
+ */
+ @Override
+ public void open(Configuration parameters) {
+ initializeConnection();
+ }
+
+ /**
+ * Closes the connection with the Socket server.
+ */
+ @Override
+ public void close() {
+ closeConnection();
+ }
+
+ private static class StreamWriterDataOutputView extends DataOutputStream implements DataOutputView {
+
+ public StreamWriterDataOutputView(OutputStream stream) {
+ super(stream);
+ }
+
+ public void skipBytesToWrite(int numBytes) throws IOException {
+ for (int i = 0; i < numBytes; i++) {
+ write(0);
+ }
+ }
+
+ public void write(DataInputView source, int numBytes) throws IOException {
+ byte[] data = new byte[numBytes];
+ source.readFully(data);
+ write(data);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
new file mode 100644
index 0000000..a98740e
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamIterator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.util.Iterator;
+import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.EOFException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.io.DataInputStream;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+
+class DataStreamIterator<T> implements Iterator<T> {
+
+ ServerSocket socket;
+ InputStream tcpStream;
+ T next;
+ private final CountDownLatch connectionAccepted = new CountDownLatch(1);
+ private volatile StreamReaderDataInputView streamReader;
+ private final TypeSerializer<T> serializer;
+
+ DataStreamIterator(TypeSerializer serializer) {
+ this.serializer = serializer;
+ try {
+ socket = new ServerSocket(0, 1, null);
+ } catch (IOException e) {
+ throw new RuntimeException("DataStreamIterator: an I/O error occurred when opening the socket", e);
+ }
+ (new AcceptThread()).start();
+ }
+
+ private class AcceptThread extends Thread {
+ public void run() {
+ try {
+ tcpStream = socket.accept().getInputStream();
+ streamReader = new StreamReaderDataInputView(tcpStream);
+ connectionAccepted.countDown();
+ } catch (IOException e) {
+ throw new RuntimeException("DataStreamIterator.AcceptThread failed", e);
+ }
+ }
+ }
+
+ /**
+ * Returns the port on which the iterator is getting the data. (Used internally.)
+ * @return The port
+ */
+ public int getPort() {
+ return socket.getLocalPort();
+ }
+
+ /**
+ * Returns true if the DataStream has more elements.
+ * (Note: blocks if there will be more elements, but they are not available yet.)
+ * @return true if the DataStream has more elements
+ */
+ @Override
+ public boolean hasNext() {
+ if (next == null) {
+ readNextFromStream();
+ }
+ return next != null;
+ }
+
+ /**
+ * Returns the next element of the DataStream. (Blocks if it is not available yet.)
+ * @return The element
+ * @throws NoSuchElementException if the stream has already ended
+ */
+ @Override
+ public T next() {
+ if (next == null) {
+ readNextFromStream();
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ }
+ T current = next;
+ next = null;
+ return current;
+ }
+
+ private void readNextFromStream(){
+ try {
+ connectionAccepted.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("The calling thread of DataStreamIterator.readNextFromStream was interrupted.");
+ }
+ try {
+ next = serializer.deserialize(streamReader);
+ } catch (EOFException e) {
+ next = null;
+ } catch (IOException e) {
+ throw new RuntimeException("DataStreamIterator could not read from deserializedStream", e);
+ }
+ }
+
+ private static class StreamReaderDataInputView extends DataInputStream implements DataInputView {
+
+ public StreamReaderDataInputView(InputStream stream) {
+ super(stream);
+ }
+
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
new file mode 100644
index 0000000..276409d
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.runtime.net.NetUtils;
+
+public class DataStreamUtils {
+
+ /**
+ * Returns an iterator to iterate over the elements of the DataStream.
+ * @return The iterator
+ */
+ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+ TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionEnvironment().getConfig());
+ DataStreamIterator<OUT> it = new DataStreamIterator<OUT>(serializer);
+
+ //Find out what IP of us should be given to CollectSink, that it will be able to connect to
+ StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+ InetAddress clientAddress;
+ if(env instanceof RemoteStreamEnvironment) {
+ String host = ((RemoteStreamEnvironment)env).getHost();
+ int port = ((RemoteStreamEnvironment)env).getPort();
+ try {
+ clientAddress = NetUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400);
+ } catch (IOException e) {
+ throw new RuntimeException("IOException while trying to connect to the master", e);
+ }
+ } else {
+ try {
+ clientAddress = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException("getLocalHost failed", e);
+ }
+ }
+
+ DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress, it.getPort(), serializer));
+ sink.setParallelism(1); // It would not work if multiple instances would connect to the same port
+
+ (new CallExecute(stream)).start();
+
+ return it;
+ }
+
+ private static class CallExecute<OUT> extends Thread {
+
+ DataStream<OUT> stream;
+
+ public CallExecute(DataStream<OUT> stream) {
+ this.stream = stream;
+ }
+
+ @Override
+ public void run(){
+ try {
+ stream.getExecutionEnvironment().execute();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception in execute()", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/.hidden
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
new file mode 100644
index 0000000..f21e58c
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Test;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.DataStreamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.junit.Assert;
+
+import java.util.Iterator;
+
+/**
+ * This test verifies the behavior of DataStreamUtils.collect.
+ */
+public class CollectITCase {
+
+ @Test
+ public void testCollect() {
+
+ Configuration config = new Configuration();
+ ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+ "localhost", cluster.getJobManagerRPCPort());
+
+ long N = 10;
+ DataStream<Long> stream = env.generateSequence(1, N);
+
+ long i = 1;
+ for(Iterator it = DataStreamUtils.collect(stream); it.hasNext(); ) {
+ Long x = (Long) it.next();
+ if(x != i) {
+ Assert.fail(String.format("Should have got %d, got %d instead.", i, x));
+ }
+ i++;
+ }
+ if(i != N + 1) {
+ Assert.fail(String.format("Should have collected %d numbers, got %d instead.", N, i - 1));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/pom.xml b/flink-contrib/flink-tweet-inputformat/pom.xml
new file mode 100644
index 0000000..6762a0b
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-contrib-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-tweet-inputformat</artifactId>
+ <name>flink-tweet-inputformat</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/.hidden
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
new file mode 100644
index 0000000..a72fc14
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+ private transient JSONParser parser;
+ private transient TweetHandler handler;
+
+ @Override
+ public void open(FileInputSplit split) throws IOException {
+ super.open(split);
+ this.handler = new TweetHandler();
+ this.parser = new JSONParser();
+ }
+
+ @Override
+ public Tweet nextRecord(Tweet record) throws IOException {
+ Boolean result = false;
+
+ do {
+ try {
+ record.reset(0);
+ record = super.nextRecord(record);
+ result = true;
+
+ } catch (JsonParseException e) {
+ result = false;
+
+ }
+ } while (!result);
+
+ return record;
+ }
+
+ @Override
+ public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException {
+
+ InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes));
+ jsonReader.skip(offset);
+
+ try {
+
+ handler.reuse = reuse;
+ parser.parse(jsonReader, handler, false);
+ } catch (ParseException e) {
+
+ LOG.debug(" Tweet Parsing Exception : " + e.getMessage());
+ }
+
+ return reuse;
+ }
+
+ @Override
+ public TypeInformation<Tweet> getProducedType() {
+ return new GenericTypeInfo<Tweet>(Tweet.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
new file mode 100644
index 0000000..2b9f389
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/io/TweetHandler.java
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Contributors;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
+import org.json.simple.parser.ContentHandler;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+public class TweetHandler implements ContentHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(TweetHandler.class);
+
+ protected Tweet reuse;
+
+ private int nesting = 0;
+
+ private ObjectState objectState = ObjectState.TWEET;
+
+ private EntryState entryState = EntryState.UNEXPECTED;
+
+ private boolean sameHashTag = false;
+
+ // to handle the coordinates special case of nesting primitive types
+ private int coordinatesCounter = 0;
+
+ private double coordinatesTemp = 0.0d;
+
+
+ @Override
+ public void startJSON() throws ParseException, IOException {
+ sameHashTag = true;
+
+ }
+
+ @Override
+ public void endJSON() throws ParseException, IOException {
+
+
+ }
+
+ @Override
+ public boolean startObject() throws ParseException, IOException {
+
+ nesting++;
+ return true;
+ }
+
+ @Override
+ public boolean endObject() throws ParseException, IOException {
+
+ nesting--;
+
+ if (this.nesting == 1) {
+ this.objectState = ObjectState.TWEET;
+ }
+
+ // The handler in JSONParser checks for the "!contentHandler.endObject()", so we should
+ // return false if its not the end of the object.
+ return nesting > 0;
+ }
+
+ @Override
+ public boolean startObjectEntry(String key) throws ParseException, IOException {
+
+ if ((key.equals("contributors") || key.equals("user") || key.equals("geo") || key.equals("place") || key.equals("attributes") || key.equals("bounding_box"))) {
+ objectState = ObjectState.valueOf(key.toUpperCase());
+ } else if (key.equals("hashtags") && nesting == 2) {
+ objectState = ObjectState.valueOf(key.toUpperCase());
+ } else if (key.equals("coordinates") && (this.nesting == 1)) {
+ objectState = ObjectState.valueOf(key.toUpperCase());
+ } else {
+ try {
+ entryState = EntryState.valueOf(key.toUpperCase());
+ } catch (IllegalArgumentException e) {
+
+ logger.debug(e.getMessage());
+
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean endObjectEntry() throws ParseException, IOException {
+
+ if (objectState == ObjectState.CONTRIBUTORS && nesting == 1) {
+ objectState = ObjectState.TWEET;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean startArray() throws ParseException, IOException {
+
+ return true;
+ }
+
+ @Override
+ public boolean endArray() throws ParseException, IOException {
+
+ if (objectState == ObjectState.COORDINATES) {
+ coordinatesCounter = 0;
+ coordinatesTemp = 0.0d;
+ }
+
+
+ // Some tweets have HashTags twice, this condition to read only one of them
+ if (objectState == ObjectState.HASHTAGS && entryState == EntryState.INDICES && nesting == 2) {
+ sameHashTag = false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean primitive(Object value) throws ParseException, IOException {
+
+ try {
+
+ if (objectState == ObjectState.TWEET) {
+ tweetObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.USER) {
+ userObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.GEO) {
+
+ return true;
+
+ } else if (objectState == ObjectState.COORDINATES) {
+
+ coordinatesObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.PLACE) {
+
+ placeObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.GEO) {
+
+ return true;
+
+ } else if (objectState == ObjectState.ATTRIBUTES) {
+ placeAttributesObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.CONTRIBUTORS) {
+ contributorsObjectStatePrimitiveHandler(value);
+ } else if (objectState == ObjectState.HASHTAGS && entryState == EntryState.TEXT && sameHashTag) {
+ hashTagsObjectStatePrimitiveHandler(value);
+ }
+ } catch (Exception e) {
+ logger.debug("Error in primitive type: " + e.getMessage());
+ }
+
+
+ return true;
+ }
+
+ public void tweetObjectStatePrimitiveHandler(Object value) {
+
+ switch (entryState) {
+ case CREATED_AT:
+ if (value != null) {
+ reuse.setCreated_at((String) value);
+ }
+ break;
+ case TEXT:
+ if (value != null) {
+ reuse.setText((String) value);
+ }
+ break;
+ case ID:
+ if (value != null) {
+ reuse.setId((Long) value);
+ }
+ break;
+ case ID_STR:
+ if (value != null) {
+ reuse.setId_str((String) value);
+ }
+ break;
+ case SOURCE:
+ if (value != null) {
+ reuse.setSource((String) value);
+ }
+ break;
+ case TRUNCATED:
+ if (value != null) {
+ reuse.setTruncated((Boolean) value);
+ }
+ break;
+ case IN_REPLY_TO_STATUS_ID:
+ if (value != null) {
+ reuse.setIn_reply_to_status_id((Long) value);
+ }
+ break;
+ case IN_REPLY_TO_STATUS_ID_STR:
+ if (value != null) {
+ reuse.setIn_reply_to_status_id_str((String) value);
+ }
+ break;
+ case IN_REPLY_TO_USER_ID:
+ if (value != null) {
+ reuse.setIn_reply_to_user_id((Long) value);
+ }
+ break;
+ case IN_REPLY_TO_USER_ID_STR:
+ if (value != null) {
+ reuse.setIn_reply_to_user_id_str((String) value);
+ }
+ break;
+ case IN_REPLY_TO_SCREEN_NAME:
+ if (value != null) {
+ reuse.setIn_reply_to_screen_name((String) value);
+ }
+ break;
+ case RETWEET_COUNT:
+ if (value != null) {
+ reuse.setRetweet_count((Long) value);
+ }
+ break;
+ case FAVORITE_COUNT:
+ if (value != null) {
+ reuse.setFavorite_count((Long) value);
+ }
+ break;
+ case FAVORITED:
+ if (value != null) {
+ reuse.setFavorited((Boolean) value);
+ }
+ break;
+ case RETWEETED:
+ if (value != null) {
+ reuse.setRetweeted((Boolean) value);
+ }
+ break;
+ case POSSIBLY_SENSITIVE:
+ if (value != null) {
+ reuse.setPossibly_sensitive((Boolean) value);
+ }
+ break;
+ case FILTER_LEVEL:
+ if (value != null) {
+ reuse.setFilter_level((String) value);
+ }
+ break;
+ case LANG:
+ if (value != null) {
+ reuse.setLang((String) value);
+ }
+ break;
+ }
+ }
+
+ public void userObjectStatePrimitiveHandler(Object value) {
+
+ switch (entryState) {
+ case ID:
+ if (value != null) {
+ // handle format exception caused by wrong values in the "id" field in the
+ // tweets.
+ if (value instanceof String) {
+ try {
+ reuse.getUser().setId(Long.parseLong((String) value));
+ } catch (NumberFormatException e) {
+ reuse.getUser().setId(0L);
+ logger.debug("This Tweet_ID is not a numeric type : " + (String) value);
+ }
+ } else {
+ reuse.getUser().setId((Long) value);
+ }
+ }
+ break;
+ case ID_STR:
+ if (value != null) {
+ reuse.getUser().setId_str((String) value);
+ }
+ break;
+ case NAME:
+ if (value != null) {
+ reuse.getUser().setName((String) value);
+ }
+ break;
+ case SCREEN_NAME:
+ if (value != null) {
+ reuse.getUser().setScreen_name((String) value);
+ }
+ break;
+ case LOCATION:
+ if (value != null) {
+ reuse.getUser().setLocation((String) value);
+ }
+ break;
+ case URL:
+ if (value != null) {
+ reuse.getUser().setUrl((String) value);
+ }
+ break;
+ case DESCRIPTION:
+ if (value != null) {
+ reuse.getUser().setDescription((String) value);
+ }
+ break;
+ case PROTECTED:
+ if (value != null) {
+ reuse.getUser().setProtected_tweet((Boolean) value);
+ }
+ break;
+ case VERIFIED:
+ if (value != null) {
+ reuse.getUser().setVerified((Boolean) value);
+ }
+ break;
+ case FOLLOWERS_COUNT:
+ if (value != null) {
+ reuse.getUser().setFollowers_count((Long) value);
+ }
+ break;
+ case FRIENDS_COUNT:
+ if (value != null) {
+ reuse.getUser().setFriends_count((Long) value);
+ }
+ break;
+ case LISTED_COUNT:
+ if (value != null) {
+ reuse.getUser().setListed_count((Long) value);
+ }
+ break;
+ case FAVOURITES_COUNT:
+ if (value != null) {
+ reuse.getUser().setFavourites_count((Long) value);
+ }
+ break;
+ case STATUSES_COUNT:
+ if (value != null) {
+ reuse.getUser().setStatuses_count((Long) value);
+ }
+ break;
+ case CREATED_AT:
+ if (value != null) {
+ reuse.getUser().setCreated_at((String) value);
+ }
+ break;
+ case UTC_OFFSET:
+ if (value != null) {
+ reuse.getUser().setUtc_offset((Long) value);
+ }
+ break;
+ case TIME_ZONE:
+ if (value != null) {
+ reuse.getUser().setTime_zone((String) value);
+ }
+ break;
+ case GEO_ENABLED:
+ if (value != null) {
+ reuse.getUser().setGeo_enabled((Boolean) value);
+ }
+ break;
+ case LANG:
+ if (value != null) {
+ reuse.getUser().setLang((String) value);
+ }
+ break;
+ case CONTRIBUTORS_ENABLED:
+ if (value != null) {
+ reuse.getUser().setContributors_enabled((Boolean) value);
+ }
+ break;
+ case IS_TRANSLATOR:
+ if (value != null) {
+ reuse.getUser().setIs_translator((Boolean) value);
+ }
+ break;
+ case PROFILE_BACKGROUND_COLOR:
+ if (value != null) {
+ reuse.getUser().setProfile_background_color((String) value);
+ }
+ break;
+ case PROFILE_BACKGROUND_IMAGE_URL:
+ if (value != null) {
+ reuse.getUser().setProfile_background_image_url((String) value);
+ }
+ break;
+ case PROFILE_BACKGROUND_IMAGE_URL_HTTPS:
+ if (value != null) {
+ reuse.getUser().setProfile_background_image_url_https((String) value);
+ }
+ break;
+ case PROFILE_BACKGROUND_TILE:
+ if (value != null) {
+ reuse.getUser().setProfile_background_tile((Boolean) value);
+ }
+ break;
+ case PROFILE_LINK_COLOR:
+ if (value != null) {
+ reuse.getUser().setProfile_link_color((String) value);
+ }
+ break;
+ case PROFILE_SIDEBAR_BORDER_COLOR:
+ if (value != null) {
+ reuse.getUser().setProfile_sidebar_border_color((String) value);
+ }
+ break;
+ case PROFILE_SIDEBAR_FILL_COLOR:
+ if (value != null) {
+ reuse.getUser().setProfile_sidebar_fill_color((String) value);
+ }
+ break;
+ case PROFILE_TEXT_COLOR:
+ if (value != null) {
+ reuse.getUser().setProfile_text_color((String) value);
+ }
+ break;
+ case PROFILE_USE_BACKGROUND_IMAGE:
+ if (value != null) {
+ reuse.getUser().setProfile_use_background_image((Boolean) value);
+ }
+ break;
+ case PROFILE_IMAGE_URL:
+ if (value != null) {
+ reuse.getUser().setProfile_image_url((String) value);
+ }
+ break;
+ case PROFILE_IMAGE_URL_HTTPS:
+ if (value != null) {
+ reuse.getUser().setProfile_image_url_https((String) value);
+ }
+ break;
+ case PROFILE_BANNER_URL:
+ if (value != null) {
+ reuse.getUser().setProfile_banner_url((String) value);
+ }
+ break;
+ case DEFAULT_PROFILE:
+ if (value != null) {
+ reuse.getUser().setDefault_profile((Boolean) value);
+ }
+ break;
+ case DEFAULT_PROFILE_IMAGE:
+ if (value != null) {
+ reuse.getUser().setDefault_profile_image((Boolean) value);
+ }
+ break;
+ case FOLLOWING:
+ if (value != null) {
+ reuse.getUser().setFollowing((Boolean) value);
+ }
+ break;
+ case FOLLOW_REQUEST_SENT:
+ if (value != null) {
+ reuse.getUser().setFollow_request_sent((Boolean) value);
+ }
+ break;
+ case NOTIFICATIONS:
+ if (value != null) {
+ reuse.getUser().setNotifications((Boolean) value);
+ }
+ break;
+ }
+ }
+
+ public void coordinatesObjectStatePrimitiveHandler(Object value) {
+
+ switch (entryState) {
+ case COORDINATES:
+ if (value != null && this.coordinatesCounter == 0) {
+ coordinatesTemp = (Double) value;
+ this.coordinatesCounter++;
+ } else if (value != null && this.coordinatesCounter == 1) {
+ reuse.getCoordinates().setCoordinates(coordinatesTemp, (Double) value);
+ } else {
+ reuse.getCoordinates().setCoordinates(0.0d, 0.0d);
+ }
+ break;
+ }
+
+ }
+
+ public void placeObjectStatePrimitiveHandler(Object value) {
+
+ switch (entryState) {
+ case ID:
+ if (value != null) {
+ reuse.getPlace().setId((String) value);
+ }
+ break;
+ case URL:
+ if (value != null) {
+ reuse.getPlace().setUrl((String) value);
+ }
+ break;
+ case PLACE_TYPE:
+ if (value != null) {
+ reuse.getPlace().setPlace_type((String) value);
+ }
+ break;
+ case NAME:
+ if (value != null) {
+ reuse.getPlace().setName((String) value);
+ }
+ break;
+ case FULL_NAME:
+ if (value != null) {
+ reuse.getPlace().setFull_name((String) value);
+ }
+ break;
+ case COUNTRY_CODE:
+ if (value != null) {
+ reuse.getPlace().setCountry_code((String) value);
+ }
+ break;
+ case COUNTRY:
+ if (value != null) {
+ reuse.getPlace().setCountry((String) value);
+ }
+ break;
+
+ // Skipped BoundingBox -- Not Required
+
+
+ }
+ }
+
+ public void placeAttributesObjectStatePrimitiveHandler(Object value) {
+
+ switch (entryState) {
+ case STREET_ADDRESS:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setStreet_address((String) value);
+ }
+ break;
+ case LOCALITY:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setLocality((String) value);
+ }
+ break;
+ case REGION:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setRegion((String) value);
+ }
+ break;
+ case ISO3:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setIso3((String) value);
+ }
+ break;
+ case POSTAL_CODE:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setPostal_code((String) value);
+ }
+ break;
+ case PHONE:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setPhone((String) value);
+ }
+ break;
+ case URL:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setUrl((String) value);
+ }
+ break;
+ case APP_ID:
+ if (value != null) {
+ reuse.getPlace().getAttributes().setAppId((String) value);
+ }
+ break;
+ // Skipped BoundingBox -- Not Required
+
+ }
+
+
+ }
+
+ public void contributorsObjectStatePrimitiveHandler(Object value) {
+
+ // to handle the case of the null as contributors is an array in the Twitter documentation
+ // && if it is not null we initialize the object and fill it with the data,
+ if (value == null) {
+ reuse.getContributors().add(new Contributors());
+ } else {
+
+ Contributors contributor = new Contributors();
+
+ switch (entryState) {
+ case ID:
+ if (value != null) {
+ contributor.setId((Long) value);
+ }
+ break;
+ case ID_STR:
+ if (value != null) {
+ contributor.setId_str((String) value);
+ }
+ break;
+ case TWEET_CONTRIBUTORS_SCREEN_NAME:
+ if (value != null) {
+ contributor.setScreenName((String) value);
+ }
+ break;
+ }
+ reuse.getContributors().add(contributor);
+
+ }
+
+
+ }
+
+ public void hashTagsObjectStatePrimitiveHandler(Object value) {
+
+ HashTags hashTag = new HashTags();
+
+ if (value == null) {
+ return;
+ } else if (entryState == EntryState.TEXT && value != null) {
+ hashTag.setText((String) value, false);
+ reuse.getEntities().getHashtags().add(hashTag);
+ }
+ }
+
+ private static enum ObjectState {
+ TWEET,
+ CONTRIBUTORS,
+ USER,
+ GEO,
+ COORDINATES,
+ PLACE,
+ ATTRIBUTES,
+ BOUNDING_BOX,
+ HASHTAGS;
+
+ }
+
+ private static enum EntryState {
+ TEXT,
+ CREATED_AT,
+ ID,
+ ID_STR,
+ SOURCE,
+ TRUNCATED,
+ IN_REPLY_TO_STATUS_ID,
+ IN_REPLY_TO_STATUS_ID_STR,
+ IN_REPLY_TO_USER_ID,
+ IN_REPLY_TO_USER_ID_STR,
+ IN_REPLY_TO_SCREEN_NAME,
+ RETWEET_COUNT,
+ FAVORITE_COUNT,
+ FAVORITED,
+ RETWEETED,
+ POSSIBLY_SENSITIVE,
+ FILTER_LEVEL,
+ TWEET_CONTRIBUTORS_SCREEN_NAME,
+ SCREEN_NAME,
+ LOCATION,
+ DESCRIPTION,
+ PROTECTED,
+ VERIFIED,
+ FOLLOWERS_COUNT,
+ FRIENDS_COUNT,
+ LISTED_COUNT,
+ FAVOURITES_COUNT,
+ STATUSES_COUNT,
+ UTC_OFFSET,
+ TIME_ZONE,
+ GEO_ENABLED,
+ LANG,
+ CONTRIBUTORS_ENABLED,
+ IS_TRANSLATOR,
+ PROFILE_BACKGROUND_COLOR,
+ PROFILE_BACKGROUND_IMAGE_URL,
+ PROFILE_BACKGROUND_IMAGE_URL_HTTPS,
+ PROFILE_BACKGROUND_TILE,
+ PROFILE_LINK_COLOR,
+ PROFILE_SIDEBAR_BORDER_COLOR,
+ PROFILE_SIDEBAR_FILL_COLOR,
+ PROFILE_TEXT_COLOR,
+ PROFILE_USE_BACKGROUND_IMAGE,
+ PROFILE_IMAGE_URL,
+ PROFILE_IMAGE_URL_HTTPS,
+ PROFILE_BANNER_URL,
+ DEFAULT_PROFILE,
+ DEFAULT_PROFILE_IMAGE,
+ FOLLOWING,
+ FOLLOW_REQUEST_SENT,
+ NOTIFICATIONS,
+ TYPE,
+ COORDINATES,
+ PLACE_TYPE,
+ NAME,
+ FULL_NAME,
+ COUNTRY_CODE,
+ COUNTRY,
+ BOUNDING_BOX,
+ ATTRIBUTES,
+ STREET_ADDRESS,
+ LOCALITY,
+ REGION,
+ ISO3,
+ POSTAL_CODE,
+ PHONE,
+ URL,
+ ENTITIES,
+ HASHTAGS,
+ TRENDS,
+ URLS,
+ USER_MENTIONS,
+ SYMBOLS,
+ MEDIA,
+ INDICES,
+ MEDIA_URL,
+ MEDIA_URL_HTTPS,
+ DISPLAY_URL,
+ EXPANDED_URL,
+ SIZES,
+ LARGE,
+ W,
+ H,
+ RESIZE,
+ SMALL,
+ THUMB,
+ MEDIUM,
+ RETWEETED_STATUS,
+ SOURCE_STATUS_ID,
+ SOURCE_STATUS_ID_STR,
+ SCOPES,
+ FOLLOWERS,
+ APP_ID,
+ UNEXPECTED;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
new file mode 100755
index 0000000..839b54f
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/User/Users.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.User;
+
+
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * {@link Users} can be anyone or anything. They {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}
+ * , follow, create lists, have a home_timeline, can be mentioned, and can be looked up in bulk.
+ */
+public class Users {
+
+
+ private boolean contributors_enabled;
+
+ private String created_at = "";
+
+ private boolean default_profile;
+
+ private boolean default_profile_image;
+
+ private String description = "";
+
+ private Entities entities;
+
+ private long favourites_count;
+
+ private boolean follow_request_sent;
+
+ private boolean following;
+
+ private long followers_count;
+
+ private long friends_count;
+
+ private boolean geo_enabled;
+
+ private long id;
+
+ private String id_str = "";
+
+ private boolean is_translator;
+
+ private String lang = "";
+
+ private long listed_count;
+
+ private String location = "";
+
+ private String name = "";
+
+ private boolean notifications;
+
+ private String profile_background_color = "";
+
+ private String profile_background_image_url = "";
+
+ private String profile_background_image_url_https = "";
+
+ private boolean profile_background_tile;
+
+ private String profile_banner_url = "";
+
+ private String profile_image_url = "";
+
+ private String profile_image_url_https = "";
+
+ private String profile_link_color = "";
+
+ private String profile_sidebar_border_color = "";
+
+ private String profile_sidebar_fill_color = "";
+
+ private String profile_text_color = "";
+
+ private boolean profile_use_background_image;
+
+ private boolean protected_tweet;
+
+ private String screen_name = "";
+
+ private long statuses_count;
+
+ private String time_zone = "";
+
+ private String url = "";
+
+ private long utc_offset;
+
+ private boolean verified;
+
+ public Users() {
+ reset();
+ }
+
+ // to avoid FLINK KRYO serializer problem
+ public void reset() {
+
+ contributors_enabled = false;
+ created_at = "";
+ default_profile = false;
+ default_profile_image = false;
+ description = "";
+ entities = new Entities();
+ favourites_count = 0L;
+ follow_request_sent = false;
+ following = false;
+ followers_count = 0L;
+ friends_count = 0L;
+ geo_enabled = false;
+ id = 0L;
+ id_str = "";
+ is_translator = false;
+ lang = "";
+ listed_count = 0L;
+ location = "";
+ name = "";
+ notifications = false;
+ profile_background_color = "";
+ profile_background_image_url = "";
+ profile_background_image_url_https = "";
+ profile_background_tile = false;
+ profile_banner_url = "";
+ profile_image_url = "";
+ profile_image_url_https = "";
+ profile_link_color = "";
+ profile_sidebar_border_color = "";
+ profile_sidebar_fill_color = "";
+ profile_text_color = "";
+ profile_use_background_image = false;
+ protected_tweet = false;
+ screen_name = "";
+ statuses_count = 0L;
+ time_zone = "";
+ url = "";
+ utc_offset = 0L;
+ verified = false;
+
+ }
+
+ private String getUTCTime() {
+
+ Date date = new Date();
+ SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
+ return format.format(date);
+
+ }
+
+ public boolean isContributors_enabled() {
+ return contributors_enabled;
+ }
+
+ public void setContributors_enabled(boolean contributors_enabled) {
+ this.contributors_enabled = contributors_enabled;
+ }
+
+ public String getCreated_at() {
+ return created_at;
+ }
+
+ public void setCreated_at(String created_at) {
+ this.created_at = created_at;
+ }
+
+ public boolean isDefault_profile() {
+ return default_profile;
+ }
+
+ public void setDefault_profile(boolean default_profile) {
+ this.default_profile = default_profile;
+ }
+
+ public boolean isDefault_profile_image() {
+ return default_profile_image;
+ }
+
+ public void setDefault_profile_image(boolean default_profile_image) {
+ this.default_profile_image = default_profile_image;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ public Entities getEntities() {
+ return entities;
+ }
+
+ public void setEntities(Entities entities) {
+ this.entities = entities;
+ }
+
+ public long getFavourites_count() {
+ return favourites_count;
+ }
+
+ public void setFavourites_count(long favourites_count) {
+ this.favourites_count = favourites_count;
+ }
+
+ public boolean isFollow_request_sent() {
+ return follow_request_sent;
+ }
+
+ public void setFollow_request_sent(boolean follow_request_sent) {
+ this.follow_request_sent = follow_request_sent;
+ }
+
+ public boolean isFollowing() {
+ return following;
+ }
+
+ public void setFollowing(boolean following) {
+ this.following = following;
+ }
+
+ public long getFollowers_count() {
+ return followers_count;
+ }
+
+ public void setFollowers_count(long followers_count) {
+ this.followers_count = followers_count;
+ }
+
+ public long getFriends_count() {
+ return friends_count;
+ }
+
+ public void setFriends_count(long friends_count) {
+ this.friends_count = friends_count;
+ }
+
+ public boolean isGeo_enabled() {
+ return geo_enabled;
+ }
+
+ public void setGeo_enabled(boolean geo_enabled) {
+ this.geo_enabled = geo_enabled;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getId_str() {
+ return Long.toString(id);
+ }
+
+ public void setId_str(String id_str) {
+ this.id_str = id_str;
+ }
+
+ public boolean isIs_translator() {
+ return is_translator;
+ }
+
+ public void setIs_translator(boolean is_translator) {
+ this.is_translator = is_translator;
+ }
+
+ public String getLang() {
+ return lang;
+ }
+
+ public void setLang(String lang) {
+ this.lang = lang;
+ }
+
+ public long getListed_count() {
+ return listed_count;
+ }
+
+ public void setListed_count(long listed_count) {
+ this.listed_count = listed_count;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public boolean isNotifications() {
+ return notifications;
+ }
+
+ public void setNotifications(boolean notifications) {
+ this.notifications = notifications;
+ }
+
+ public String getProfile_background_color() {
+ return profile_background_color;
+ }
+
+ public void setProfile_background_color(String profile_background_color) {
+ this.profile_background_color = profile_background_color;
+ }
+
+ public String getProfile_background_image_url() {
+ return profile_background_image_url;
+ }
+
+ public void setProfile_background_image_url(String profile_background_image_url) {
+ this.profile_background_image_url = profile_background_image_url;
+ }
+
+ public String getProfile_background_image_url_https() {
+ return profile_background_image_url_https;
+ }
+
+ public void setProfile_background_image_url_https(String profile_background_image_url_https) {
+ this.profile_background_image_url_https = profile_background_image_url_https;
+ }
+
+ public boolean isProfile_background_tile() {
+ return profile_background_tile;
+ }
+
+ public void setProfile_background_tile(boolean profile_background_tile) {
+ this.profile_background_tile = profile_background_tile;
+ }
+
+ public String getProfile_banner_url() {
+ return profile_banner_url;
+ }
+
+ public void setProfile_banner_url(String profile_banner_url) {
+ this.profile_banner_url = profile_banner_url;
+ }
+
+ public String getProfile_image_url() {
+ return profile_image_url;
+ }
+
+ public void setProfile_image_url(String profile_image_url) {
+ this.profile_image_url = profile_image_url;
+ }
+
+ public String getProfile_image_url_https() {
+ return profile_image_url_https;
+ }
+
+ public void setProfile_image_url_https(String profile_image_url_https) {
+ this.profile_image_url_https = profile_image_url_https;
+ }
+
+ public String getProfile_link_color() {
+ return profile_link_color;
+ }
+
+ public void setProfile_link_color(String profile_link_color) {
+ this.profile_link_color = profile_link_color;
+ }
+
+ public String getProfile_sidebar_border_color() {
+ return profile_sidebar_border_color;
+ }
+
+ public void setProfile_sidebar_border_color(String profile_sidebar_border_color) {
+ this.profile_sidebar_border_color = profile_sidebar_border_color;
+ }
+
+ public String getProfile_sidebar_fill_color() {
+ return profile_sidebar_fill_color;
+ }
+
+ public void setProfile_sidebar_fill_color(String profile_sidebar_fill_color) {
+ this.profile_sidebar_fill_color = profile_sidebar_fill_color;
+ }
+
+ public String getProfile_text_color() {
+ return profile_text_color;
+ }
+
+ public void setProfile_text_color(String profile_text_color) {
+ this.profile_text_color = profile_text_color;
+ }
+
+ public boolean isProfile_use_background_image() {
+ return profile_use_background_image;
+ }
+
+ public void setProfile_use_background_image(boolean profile_use_background_image) {
+ this.profile_use_background_image = profile_use_background_image;
+ }
+
+ public boolean isProtected_tweet() {
+ return protected_tweet;
+ }
+
+ public void setProtected_tweet(boolean protected_tweet) {
+ this.protected_tweet = protected_tweet;
+ }
+
+ public String getScreen_name() {
+ return screen_name;
+ }
+
+ public void setScreen_name(String screen_name) {
+ this.screen_name = screen_name;
+ }
+
+ public long getStatuses_count() {
+ return statuses_count;
+ }
+
+ public void setStatuses_count(long statuses_count) {
+ this.statuses_count = statuses_count;
+ }
+
+ public String getTime_zone() {
+ return time_zone;
+ }
+
+ public void setTime_zone(String time_zone) {
+ this.time_zone = time_zone;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getUtc_offset() {
+ return utc_offset;
+ }
+
+ public void setUtc_offset(long utc_offset) {
+ this.utc_offset = utc_offset;
+ }
+
+ public boolean isVerified() {
+ return verified;
+ }
+
+ public void setVerified(boolean verified) {
+ this.verified = verified;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
new file mode 100755
index 0000000..7cf9695
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Attributes.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+public class Attributes {
+
+ private String street_address = "";
+
+ private String locality = "";
+
+ private String region = "";
+
+ private String iso3 = "";
+
+ private String postal_code = "";
+
+ private String phone = "";
+
+ private String twitter = "twitter";
+
+ private String url = "";
+
+ // in the API it is app:id !!
+ private String appId = "";
+
+ public Attributes() {
+
+ }
+
+
+ public String getStreet_address() {
+ return street_address;
+ }
+
+ public void setStreet_address(String street_address) {
+ this.street_address = street_address;
+ }
+
+ public String getLocality() {
+ return locality;
+ }
+
+ public void setLocality(String locality) {
+ this.locality = locality;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getIso3() {
+ return iso3;
+ }
+
+ public void setIso3(String iso3) {
+ this.iso3 = iso3;
+ }
+
+ public String getPostal_code() {
+ return postal_code;
+ }
+
+ public void setPostal_code(String postal_code) {
+ this.postal_code = postal_code;
+ }
+
+ public String getPhone() {
+ return phone;
+ }
+
+ public void setPhone(String phone) {
+ this.phone = phone;
+ }
+
+ public String getTwitter() {
+ return twitter;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
new file mode 100755
index 0000000..e885028
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/BoundingBox.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A series of longitude and latitude points, defining a box which will contain the Place entity
+ * this bounding box is related to. Each point is an array in the form of [longitude, latitude].
+ * Points are grouped into an array per bounding box. Bounding box arrays are wrapped in one
+ * additional array to be compatible with the polygon notation.
+ */
+public class BoundingBox {
+
+ private List<List<double[]>> coordinates = new ArrayList<List<double[]>>();
+
+ private String type = "Polygon";
+
+ public BoundingBox() {
+
+ }
+
+ public BoundingBox(List<double[]> points) {
+
+ this.coordinates.add(points);
+
+ }
+
+ public List<List<double[]>> getCoordinates() {
+ return coordinates;
+ }
+
+ public void setCoordinates(List<List<double[]>> coordinates) {
+ this.coordinates = coordinates;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
new file mode 100755
index 0000000..723644c
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/places/Places.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.places;
+
+/**
+ * {@link org.apache.flink.contrib.tweetinputformat.model.places.Places} are specific, named locations with
+ * corresponding geo {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Coordinates}. They can be attached
+ * to {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} by specifying a place_id when tweeting. <br>
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} associated with places are not necessarily
+ * issued from that location but could also potentially be about that location.<br>
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} can be searched for. Tweets can also be found
+ * by place_id.
+ */
+public class Places {
+
+
+ private Attributes attributes;
+
+ private BoundingBox bounding_box;
+
+ private String country = "";
+
+ private String country_code = "";
+
+ private String full_name = "";
+
+ private String id = "";
+
+ private String name = "";
+
+ private String place_type = "";
+
+ private String url = "";
+
+
+ public Places() {
+ attributes = new Attributes();
+ bounding_box = new BoundingBox();
+
+ }
+
+ public Attributes getAttributes() {
+ return attributes;
+ }
+
+ public void setAttributes(Attributes attributes) {
+ this.attributes = attributes;
+ }
+
+ public BoundingBox getBounding_box() {
+ return bounding_box;
+ }
+
+ public void setBounding_box(BoundingBox bounding_box) {
+ this.bounding_box = bounding_box;
+ }
+
+ public String getCountry() {
+ return country;
+ }
+
+ public void setCountry(String country) {
+ this.country = country;
+ }
+
+ public String getCountry_code() {
+ return country_code;
+ }
+
+ public void setCountry_code(String country_code) {
+ this.country_code = country_code;
+ }
+
+ public String getFull_name() {
+ return full_name;
+ }
+
+ public void setFull_name(String full_name) {
+ this.full_name = full_name;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getPlace_type() {
+ return place_type;
+ }
+
+ public void setPlace_type(String place_type) {
+ this.place_type = place_type;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
new file mode 100755
index 0000000..85968eb
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Contributors.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Nullable. An collection of brief user objects (usually only one) indicating users who contributed
+ * to the authorship of the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} on behalf of the
+ * official tweet author.
+ */
+public class Contributors {
+
+
+ private Long id = 0L;
+
+ private String id_str = "";
+
+ private String screenName = "";
+
+ public Contributors() {
+ reset();
+ }
+
+ public Contributors(long id, String id_str, String screenName) {
+
+ this.id = id;
+ this.id_str = id_str;
+ this.screenName = screenName;
+ }
+
+ public void reset() {
+
+ id = 0L;
+ id_str = "";
+ screenName = "";
+
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getId_str() {
+ return id_str;
+ }
+
+ public void setId_str(String id_str) {
+ this.id_str = id_str;
+ }
+
+ public String getScreenName() {
+ return screenName;
+ }
+
+ public void setScreenName(String screenName) {
+ this.screenName = screenName;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
new file mode 100755
index 0000000..c697e98
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Coordinates.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Nullable. Represents the geographic location of this
+ * {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} as reported by the user or client
+ * application. The inner coordinates array is formatted as geoJSON longitude first, then latitude)
+ */
+public class Coordinates {
+
+ private String type = "point";
+
+ private double[] coordinates = new double[2];
+
+ public Coordinates() {
+
+ }
+
+ public double[] getCoordinates() {
+ return coordinates;
+ }
+
+ public void setCoordinates(double[] coordinates) {
+ this.coordinates = coordinates;
+ }
+
+ public void setCoordinates(double longitude, double latitude) {
+ this.coordinates[0] = longitude;
+ this.coordinates[1] = latitude;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return "longitude = " + this.coordinates[0] + " latitude = " + this.coordinates[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
new file mode 100755
index 0000000..f5a43a2
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/CurrentUserRetweet.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+/**
+ * Details the {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} ID of the user’s own retweet (if
+ * existent) of this {@link org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class CurrentUserRetweet {
+
+ private long id;
+
+ private String id_str = "";
+
+ public CurrentUserRetweet() {
+ reset();
+ }
+
+ public void reset() {
+ id = 0L;
+ id_str = "";
+
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getId_str() {
+ return id_str;
+ }
+
+ public void setId_str() {
+ this.id_str = Long.toString(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
new file mode 100755
index 0000000..a2986a3
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet;
+
+import org.apache.flink.contrib.tweetinputformat.model.User.Users;
+import org.apache.flink.contrib.tweetinputformat.model.places.Places;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class Tweet {
+
+ private List<Contributors> contributors;
+
+ private Coordinates coordinates;
+
+ private String created_at = "";
+
+ private Entities entities;
+
+ private long favorite_count;
+
+ private boolean favorited;
+
+ private String filter_level = "";
+
+ private long id;
+
+ private String id_str = "";
+
+ private String in_reply_to_screen_name = "";
+
+ private long in_reply_to_status_id;
+
+ private String in_reply_to_status_id_str = "";
+
+ private long in_reply_to_user_id;
+
+ private String in_reply_to_user_id_str = "";
+
+ private String lang = "";
+
+ // Places
+ private Places place;
+
+ private boolean possibly_sensitive;
+
+ private long retweet_count;
+
+ private boolean retweeted;
+
+ private CurrentUserRetweet currentUserRetweet;
+
+ private String source = "";
+
+ private String text = "";
+
+ private boolean truncated;
+
+ private Users user;
+
+ // to Hanlde retweeted_status
+ private Tweet retweeted_status;
+
+ private int tweetLevel;
+
+ public Tweet() {
+ tweetLevel = 0;
+ reset(tweetLevel);
+ }
+
+ public Tweet(int level) {
+ tweetLevel = level;
+ reset(tweetLevel);
+ }
+
+
+ // to avoid FLINK KRYO serializer problem
+ public void reset(int level) {
+
+ contributors = new ArrayList<Contributors>();
+ coordinates = new Coordinates();
+ created_at = "";
+ entities = new Entities();
+ favorite_count = 0L;
+ favorited = false;
+ filter_level = "";
+ id = 0L;
+ id_str = "";
+ in_reply_to_screen_name = "";
+ in_reply_to_status_id = 0L;
+ in_reply_to_status_id_str = "";
+ in_reply_to_user_id = 0L;
+ in_reply_to_user_id_str = "";
+ lang = "";
+ place = new Places();
+ possibly_sensitive = false;
+ retweet_count = 0L;
+
+ // to Hanlde retweeted_status
+ if (level == 0) {
+ retweeted_status = new Tweet(++level);
+ }
+
+
+ currentUserRetweet = new CurrentUserRetweet();
+ retweeted = false;
+ source = "";
+ text = "";
+ truncated = false;
+ user = new Users();
+
+ }
+
+ public List<Contributors> getContributors() {
+ return contributors;
+ }
+
+ public void setContributors(List<Contributors> contributors) {
+ this.contributors = contributors;
+ }
+
+ public Coordinates getCoordinates() {
+ return coordinates;
+ }
+
+ public void setCoordinates(Coordinates coordinates) {
+ this.coordinates = coordinates;
+ }
+
+ public String getCreated_at() {
+ return created_at;
+ }
+
+ public void setCreated_at(String created_at) {
+ this.created_at = created_at;
+ }
+
+ public Entities getEntities() {
+ return entities;
+ }
+
+ public void setEntities(Entities entities) {
+ this.entities = entities;
+ }
+
+ public long getFavorite_count() {
+ return favorite_count;
+ }
+
+ public void setFavorite_count(long favorite_count) {
+ this.favorite_count = favorite_count;
+ }
+
+ public boolean isFavorited() {
+ return favorited;
+ }
+
+ public void setFavorited(boolean favorited) {
+ this.favorited = favorited;
+ }
+
+ public String getFilter_level() {
+ return filter_level;
+ }
+
+ public void setFilter_level(String filter_level) {
+ this.filter_level = filter_level;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getId_str() {
+ return id_str;
+ }
+
+ public void setId_str(String id_str) {
+ this.id_str = id_str;
+ }
+
+ public String getIn_reply_to_screen_name() {
+ return in_reply_to_screen_name;
+ }
+
+ public void setIn_reply_to_screen_name(String in_reply_to_screen_name) {
+ this.in_reply_to_screen_name = in_reply_to_screen_name;
+ }
+
+
+ public long getIn_reply_to_status_id() {
+ return in_reply_to_status_id;
+ }
+
+ public void setIn_reply_to_status_id(long in_reply_to_status_id) {
+ this.in_reply_to_status_id = in_reply_to_status_id;
+ }
+
+ public String getIn_reply_to_status_id_str() {
+ return in_reply_to_status_id_str;
+ }
+
+ public void setIn_reply_to_status_id_str(String in_reply_to_status_id_str) {
+ this.in_reply_to_status_id_str = in_reply_to_status_id_str;
+ }
+
+ public long getIn_reply_to_user_id() {
+ return in_reply_to_user_id;
+ }
+
+ public void setIn_reply_to_user_id(long in_reply_to_user_id) {
+ this.in_reply_to_user_id = in_reply_to_user_id;
+ }
+
+ public String getIn_reply_to_user_id_str() {
+ return in_reply_to_user_id_str;
+ }
+
+ public void setIn_reply_to_user_id_str(String in_reply_to_user_id_str) {
+ this.in_reply_to_user_id_str = in_reply_to_user_id_str;
+ }
+
+ public String getLang() {
+ return lang;
+ }
+
+ public void setLang(String lang) {
+ this.lang = lang;
+ }
+
+ public Places getPlace() {
+ return place;
+ }
+
+ public void setPlace(Places place) {
+ this.place = place;
+ }
+
+ public boolean getPossibly_sensitive() {
+ return possibly_sensitive;
+ }
+
+ public void setPossibly_sensitive(boolean possibly_sensitive) {
+ this.possibly_sensitive = possibly_sensitive;
+ }
+
+ public long getRetweet_count() {
+ return retweet_count;
+ }
+
+ public void setRetweet_count(long retweet_count) {
+ this.retweet_count = retweet_count;
+ }
+
+ public boolean isRetweeted() {
+ return retweeted;
+ }
+
+ public void setRetweeted(boolean retweeted) {
+ this.retweeted = retweeted;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public void setText(String text) {
+ this.text = text;
+ }
+
+ public boolean isTruncated() {
+ return truncated;
+ }
+
+ public void setTruncated(boolean truncated) {
+ this.truncated = truncated;
+ }
+
+ public Users getUser() {
+ return user;
+ }
+
+ public void setUser(Users user) {
+ this.user = user;
+ }
+
+ public CurrentUserRetweet getCurrentUserRetweet() {
+ return currentUserRetweet;
+ }
+
+ public void setCurrentUserRetweet(CurrentUserRetweet currentUserRetweet) {
+ this.currentUserRetweet = currentUserRetweet;
+ }
+
+
+ public boolean isPossibly_sensitive() {
+ return possibly_sensitive;
+ }
+
+ public Tweet getRetweeted_status() {
+ return retweeted_status;
+ }
+
+ public void setRetweeted_status(Tweet retweeted_status) {
+ this.retweeted_status = retweeted_status;
+ }
+
+ public int getTweetLevel() {
+ return tweetLevel;
+ }
+
+ public void setTweetLevel(int tweetLevel) {
+ this.tweetLevel = tweetLevel;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
new file mode 100755
index 0000000..d88ea34
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Entities which have been parsed out of the text of the
+ * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class Entities {
+
+ private List<HashTags> hashtags;
+
+ private List<Media> media;
+
+ private List<URL> urls;
+
+ private List<UserMention> user_mentions;
+
+ private List<Symbol> symbols;
+
+ public Entities() {
+
+ hashtags = new ArrayList<HashTags>();
+ media = new ArrayList<Media>();
+ urls = new ArrayList<URL>();
+ user_mentions = new ArrayList<UserMention>();
+ symbols = new ArrayList<Symbol>();
+
+ }
+
+ public List<HashTags> getHashtags() {
+ return hashtags;
+ }
+
+ public void setHashtags(List<HashTags> hashtags) {
+ this.hashtags = hashtags;
+ }
+
+ public List<Media> getMedia() {
+ return media;
+ }
+
+ public void setMedia(List<Media> media) {
+ this.media = media;
+ }
+
+ public List<URL> getUrls() {
+ return urls;
+ }
+
+ public void setUrls(List<URL> urls) {
+ this.urls = urls;
+ }
+
+ public List<UserMention> getUser_mentions() {
+ return user_mentions;
+ }
+
+ public void setUser_mentions(List<UserMention> user_mentions) {
+ this.user_mentions = user_mentions;
+ }
+
+
+ public List<Symbol> getSymbols() {
+ return symbols;
+ }
+
+ public void setSymbols(List<Symbol> symbols) {
+ this.symbols = symbols;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
new file mode 100755
index 0000000..1900859
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+/**
+ * Represents hashtags which have been parsed out of the
+ * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text.
+ */
+
+public class HashTags {
+
+ private long[] indices = new long[2];
+
+ private String text = "";
+
+
+ public long[] getIndices() {
+ return indices;
+ }
+
+ public void setIndices(long[] indices) {
+ this.indices = indices;
+ }
+
+ public void setIndices(long start, long end) {
+ this.indices[0] = start;
+ this.indices[1] = end;
+
+ }
+
+ public String getText() {
+ return text;
+ }
+
+ public void setText(String text, boolean hashExist) {
+ if (hashExist) {
+ this.text = text.substring((int) indices[0] + 1);
+ } else {
+ this.text = text;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
new file mode 100755
index 0000000..f006aac
--- /dev/null
+++ b/flink-contrib/flink-tweet-inputformat/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents media elements uploaded with the {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
+ */
+public class Media {
+
+
+ private String display_url = "";
+
+ private String expanded_url = "";
+
+ private long id;
+
+ private String id_str = "";
+
+ private long[] indices;
+
+ private String media_url = "";
+
+ private String media_url_https = "";
+
+ private Map<String, Size> sizes;
+
+ private String type = "";
+
+ private String url = "";
+
+ public Media() {
+
+ this.display_url = "";
+ this.expanded_url = "";
+ this.id = 0L;
+ this.id_str = "";
+ this.setIndices(new long[]{0L, 0L});
+ this.media_url = "";
+ this.media_url_https = "";
+ this.sizes = new HashMap<String, Size>();
+ this.type = "";
+ this.url = "";
+
+ }
+
+ public String getDisplay_url() {
+ return display_url;
+ }
+
+ public void setDisplay_url(String display_url) {
+ this.display_url = display_url;
+ }
+
+ public String getExpanded_url() {
+ return expanded_url;
+ }
+
+ public void setExpanded_url(String expanded_url) {
+ this.expanded_url = expanded_url;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public void setId(long id) {
+ this.id = id;
+ }
+
+ public String getId_str() {
+ return id_str;
+ }
+
+ public void setId_str(String id_str) {
+ this.id_str = id_str;
+ }
+
+ public long[] getIndices() {
+ return indices;
+ }
+
+ public void setIndices(long[] indices) {
+ this.indices = indices;
+ }
+
+ public String getMedia_url() {
+ return media_url;
+ }
+
+ public void setMedia_url(String media_url) {
+ this.media_url = media_url;
+ }
+
+ public String getMedia_url_https() {
+ return media_url_https;
+ }
+
+ public void setMedia_url_https(String media_url_https) {
+ this.media_url_https = media_url_https;
+ }
+
+ public Map<String, Size> getSizes() {
+ return sizes;
+ }
+
+ public void setSizes(Map<String, Size> sizes) {
+ this.sizes = sizes;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+}