You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2015/05/22 00:44:24 UTC
incubator-mrql git commit: [MRQL-72] Add support for stream input on
TCP sockets
Repository: incubator-mrql
Updated Branches:
refs/heads/master 9ad4e1bd8 -> a89375316
[MRQL-72] Add support for stream input on TCP sockets
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/a8937531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/a8937531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/a8937531
Branch: refs/heads/master
Commit: a8937531667014fe75078b50a5155e57a3f14ff2
Parents: 9ad4e1b
Author: fegaras <fe...@cse.uta.edu>
Authored: Thu May 21 17:33:58 2015 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Thu May 21 17:33:58 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/mrql/JsonFormatParser.java | 8 ++
.../main/java/org/apache/mrql/JsonSplitter.java | 14 +++
.../main/java/org/apache/mrql/LineParser.gen | 12 +++
core/src/main/java/org/apache/mrql/Parser.java | 1 +
.../java/org/apache/mrql/PlanGeneration.gen | 7 ++
.../main/java/org/apache/mrql/TypeInference.gen | 19 ++++
.../main/java/org/apache/mrql/XMLParser.java | 8 ++
.../main/java/org/apache/mrql/XMLSplitter.java | 14 +++
pom.xml | 2 +-
queries/streaming2.mrql | 22 +++++
.../java/org/apache/mrql/SparkSocketStream.java | 98 ++++++++++++++++++++
.../java/org/apache/mrql/SparkStreaming.gen | 4 +
12 files changed, 208 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/JsonFormatParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonFormatParser.java b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
index 77aa891..0eaea47 100644
--- a/core/src/main/java/org/apache/mrql/JsonFormatParser.java
+++ b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
@@ -68,6 +68,14 @@ public class JsonFormatParser implements Parser {
}
}
+ public void open ( String host, int port ) {
+ try {
+ splitter = new JsonSplitter(tags,host,port,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
public Tree type () { return new VariableLeaf("JSON"); }
public String slice () {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonSplitter.java b/core/src/main/java/org/apache/mrql/JsonSplitter.java
index 7017e72..cbbacab 100644
--- a/core/src/main/java/org/apache/mrql/JsonSplitter.java
+++ b/core/src/main/java/org/apache/mrql/JsonSplitter.java
@@ -21,6 +21,7 @@ import org.apache.mrql.gen.*;
import java_cup.runtime.Symbol;
import java.util.Iterator;
import java.io.*;
+import java.net.Socket;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -65,6 +66,19 @@ final public class JsonSplitter implements Iterator<DataOutputBuffer> {
scanner = new JSONLex(in);
}
+ JsonSplitter ( String[] tags, String host, int port, DataOutputBuffer buffer ) {
+ in_memory = true;
+ try {
+ Socket s = new Socket(host,port);
+ in = s.getInputStream();
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the socket: "+host+":"+port);
+ };
+ this.tags = tags;
+ this.buffer = buffer;
+ scanner = new JSONLex(in);
+ }
+
private long sync ( long start ) {
try {
long first_quote = -1;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/LineParser.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/LineParser.gen b/core/src/main/java/org/apache/mrql/LineParser.gen
index 5619b3f..5ca05b2 100644
--- a/core/src/main/java/org/apache/mrql/LineParser.gen
+++ b/core/src/main/java/org/apache/mrql/LineParser.gen
@@ -19,6 +19,7 @@ package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
+import java.net.Socket;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
@@ -145,6 +146,17 @@ public class LineParser implements Parser {
}
}
+ public void open ( String host, int port ) {
+ in_memory = true;
+ try {
+ Socket s = new Socket(host,port);
+ buffered_in = new BufferedReader(new InputStreamReader(s.getInputStream()),
+ 10000);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the socket: "+host+":"+port);
+ }
+ }
+
public void open ( FSDataInputStream fsin, long fstart, long fend ) {
in_memory = false;
this.fsin = fsin;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/Parser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Parser.java b/core/src/main/java/org/apache/mrql/Parser.java
index c1728f1..2817f50 100644
--- a/core/src/main/java/org/apache/mrql/Parser.java
+++ b/core/src/main/java/org/apache/mrql/Parser.java
@@ -27,6 +27,7 @@ interface Parser {
public Tree type ();
public void open ( String file );
public void open ( FSDataInputStream fsin, long start, long end );
+ public void open ( String host, int port );
public String slice ();
public Bag parse ( String s );
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 85e4528..c127b39 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -389,6 +389,13 @@ final public class PlanGeneration extends AlgebraicOptimization {
case call(stream,gen,`f,`len,`ulen):
return #<SequenceStream(`(makePlan(f)),`(makePlan(len)),
`(makePlan(ulen)))>;
+ case call(stream,`parser,`host,`port,...args):
+ if (!port.is_long())
+ fail;
+ Trees el = #[];
+ for ( Tree a: args )
+ el = el.append(makePlan(a));
+ return #<SocketStream(`parser,`(makePlan(host)),`port,...el)>;
case call(stream,`parser,`file,...args):
Trees el = #[];
for ( Tree a: args )
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 4f78c06..d12084c 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -621,6 +621,25 @@ public class TypeInference extends Translator {
Tree tp = type_inference(#<call(source,binary,`f)>);
((Node)e).children = ((Node)e).children.append(tp); // destructive
return tp;
+ case call(stream,`parser,`f,...args):
+ if (Config.stream_window == 0)
+ type_error(e,"Not in stream processing mode");
+ if (!parser.is_variable())
+ type_error(e,"The parser must be a constant name: "+print_query(parser));
+ if (unify(type_inference(f),#<string>) == null)
+ type_error(e,"The source file must be a string: "+print_query(f));
+ if (unify(type_inference(args.head()),#<int>) != null)
+ args = args.tail();
+ try {
+ Class<? extends Parser> pc = DataSource.parserDirectory.get(parser.toString());
+ if (pc == null)
+ type_error(e,"Unrecognized parser: "+parser);
+ Parser p = pc.newInstance();
+ p.initialize(args);
+ return #<Bag(`(p.type()))>;
+ } catch (Exception x) {
+ type_error(e,"Unrecognized parser type: "+parser);
+ }
case call(stream,...r):
if (Config.stream_window == 0)
type_error(e,"Not in stream processing mode");
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/XMLParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/XMLParser.java b/core/src/main/java/org/apache/mrql/XMLParser.java
index f55faba..54cf18e 100644
--- a/core/src/main/java/org/apache/mrql/XMLParser.java
+++ b/core/src/main/java/org/apache/mrql/XMLParser.java
@@ -83,6 +83,14 @@ public class XMLParser implements Parser {
}
}
+ public void open ( String host, int port ) {
+ try {
+ splitter = new XMLSplitter(tags,host,port,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
public Tree type () { return new VariableLeaf("XML"); }
public String slice () {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/XMLSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/XMLSplitter.java b/core/src/main/java/org/apache/mrql/XMLSplitter.java
index e407d85..caa2469 100644
--- a/core/src/main/java/org/apache/mrql/XMLSplitter.java
+++ b/core/src/main/java/org/apache/mrql/XMLSplitter.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
import javax.xml.parsers.SAXParserFactory;
import org.xml.sax.*;
import java.io.*;
+import java.net.Socket;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -67,6 +68,19 @@ final public class XMLSplitter implements Iterator<DataOutputBuffer> {
this.buffer = buffer;
}
+ XMLSplitter ( String[] tags, String host, int port, DataOutputBuffer buffer ) {
+ in_memory = true;
+ try {
+ Socket s = new Socket(host,port);
+ in = new BufferedReader(new InputStreamReader(s.getInputStream()),
+ 10000);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the socket: "+host+":"+port);
+ };
+ this.tags = tags;
+ this.buffer = buffer;
+ }
+
public boolean hasNext () {
try {
if (in_memory || fsin.getPos() < end)
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 685f219..072b62c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
<hadoop.version>1.2.1</hadoop.version>
<yarn.version>2.2.0</yarn.version>
<hama.version>0.6.4</hama.version>
- <spark.version>1.2.0</spark.version>
+ <spark.version>1.3.1</spark.version>
<scala.version>2.10</scala.version>
<flink.version>0.8.1</flink.version>
<skipTests>true</skipTests>
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/queries/streaming2.mrql
----------------------------------------------------------------------
diff --git a/queries/streaming2.mrql b/queries/streaming2.mrql
new file mode 100644
index 0000000..42af091
--- /dev/null
+++ b/queries/streaming2.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+select (k,avg(p.Y))
+from p in stream(line,"localhost",9999,"|",type( <X:long,Y:long > ))
+group by k: p.X;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkSocketStream.java b/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
new file mode 100644
index 0000000..74cfab3
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.HashMap;
+import java.util.ArrayList;
+import scala.Option;
+import scala.Some;
+import scala.None;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+
+
+/** A Spark InputDStream for a stream socket */
+public class SparkSocketStream extends JavaInputDStream<MRData> {
+ private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+
+ public static final class MRDataSocketStream extends InputDStream<MRData> {
+ private final String host;
+ private final int port;
+ private Parser parser;
+ private final JavaStreamingContext stream_context;
+
+ MRDataSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser_name, Trees args ) {
+ super(stream_context.ssc(),classtag);
+ this.host = host;
+ this.port = port;
+ this.stream_context = stream_context;
+ Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser_name);
+ if (parser_class == null)
+ throw new Error("Unknown parser: "+parser_name);
+ try {
+ parser = parser_class.newInstance();
+ parser.initialize(args);
+ parser.open(host,port);
+ } catch (Exception ex) {
+ throw new Error("Cannot create the parser");
+ }
+ }
+
+ @Override
+ public void start () {}
+
+ @Override
+ public void stop () {}
+
+ @Override
+ public Duration slideDuration () {
+ return new Duration(Config.stream_window);
+ }
+
+ @Override
+ public List dependencies () {
+ return Nil$.MODULE$;
+ }
+
+ @Override
+ public Option<RDD<MRData>> compute ( Time validTime ) {
+ long duration = validTime.milliseconds();
+ duration = Config.stream_window;
+ long ct = System.currentTimeMillis();
+ ArrayList<MRData> result = new ArrayList<MRData>();
+ while ( System.currentTimeMillis() - ct < duration ) {
+ String data = parser.slice();
+ for ( MRData x: parser.parse(data) )
+ result.add(x);
+ };
+ return new Some<RDD<MRData>>(SparkEvaluator.spark_context.parallelize(result).rdd());
+ }
+ }
+
+ SparkSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser, Trees args ) {
+ super(new MRDataSocketStream(stream_context,host,port,parser,args),classtag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index f514a48..0663430 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -55,6 +55,10 @@ public class SparkStreaming extends SparkEvaluator {
throw new Error("Cannot dump source directory");
};
return new SparkFileInputStream(stream_context,path,false);
+ case SocketStream(`parser,`hostname,`port,...args):
+ String h = ((MR_string)evalE(hostname,env)).get();
+ int p = ((MR_int)evalE(port,env)).get();
+ return new SparkSocketStream(stream_context,h,p,parser.toString(),args);
};
throw new Error("Unknown stream source: "+print_query(source));
}