You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2015/05/22 00:44:24 UTC

incubator-mrql git commit: [MRQL-72] Add support for stream input on TCP sockets

Repository: incubator-mrql
Updated Branches:
  refs/heads/master 9ad4e1bd8 -> a89375316


[MRQL-72] Add support for stream input on TCP sockets


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/a8937531
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/a8937531
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/a8937531

Branch: refs/heads/master
Commit: a8937531667014fe75078b50a5155e57a3f14ff2
Parents: 9ad4e1b
Author: fegaras <fe...@cse.uta.edu>
Authored: Thu May 21 17:33:58 2015 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Thu May 21 17:33:58 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/mrql/JsonFormatParser.java  |  8 ++
 .../main/java/org/apache/mrql/JsonSplitter.java | 14 +++
 .../main/java/org/apache/mrql/LineParser.gen    | 12 +++
 core/src/main/java/org/apache/mrql/Parser.java  |  1 +
 .../java/org/apache/mrql/PlanGeneration.gen     |  7 ++
 .../main/java/org/apache/mrql/TypeInference.gen | 19 ++++
 .../main/java/org/apache/mrql/XMLParser.java    |  8 ++
 .../main/java/org/apache/mrql/XMLSplitter.java  | 14 +++
 pom.xml                                         |  2 +-
 queries/streaming2.mrql                         | 22 +++++
 .../java/org/apache/mrql/SparkSocketStream.java | 98 ++++++++++++++++++++
 .../java/org/apache/mrql/SparkStreaming.gen     |  4 +
 12 files changed, 208 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/JsonFormatParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonFormatParser.java b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
index 77aa891..0eaea47 100644
--- a/core/src/main/java/org/apache/mrql/JsonFormatParser.java
+++ b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
@@ -68,6 +68,14 @@ public class JsonFormatParser implements Parser {
         }
     }
 
+    public void open ( String host, int port ) {
+        try {
+            splitter = new JsonSplitter(tags,host,port,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
     public Tree type () { return new VariableLeaf("JSON"); }
 
     public String slice () {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonSplitter.java b/core/src/main/java/org/apache/mrql/JsonSplitter.java
index 7017e72..cbbacab 100644
--- a/core/src/main/java/org/apache/mrql/JsonSplitter.java
+++ b/core/src/main/java/org/apache/mrql/JsonSplitter.java
@@ -21,6 +21,7 @@ import org.apache.mrql.gen.*;
 import java_cup.runtime.Symbol;
 import java.util.Iterator;
 import java.io.*;
+import java.net.Socket;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.io.DataOutputBuffer;
 
@@ -65,6 +66,19 @@ final public class JsonSplitter implements Iterator<DataOutputBuffer> {
         scanner = new JSONLex(in);
     }
 
+    JsonSplitter ( String[] tags, String host, int port, DataOutputBuffer buffer ) {
+        in_memory = true;
+        try {
+            Socket s = new Socket(host,port);
+            in = s.getInputStream();
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the socket: "+host+":"+port);
+        };
+        this.tags = tags;
+        this.buffer = buffer;
+        scanner = new JSONLex(in);
+    }
+
     private long sync ( long start ) {
         try {
             long first_quote = -1;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/LineParser.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/LineParser.gen b/core/src/main/java/org/apache/mrql/LineParser.gen
index 5619b3f..5ca05b2 100644
--- a/core/src/main/java/org/apache/mrql/LineParser.gen
+++ b/core/src/main/java/org/apache/mrql/LineParser.gen
@@ -19,6 +19,7 @@ package org.apache.mrql;
 
 import org.apache.mrql.gen.*;
 import java.io.*;
+import java.net.Socket;
 import java.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
@@ -145,6 +146,17 @@ public class LineParser implements Parser {
         }
     }
 
+    public void open ( String host, int port ) {
+        in_memory = true;
+        try {
+            Socket s = new Socket(host,port);
+            buffered_in = new BufferedReader(new InputStreamReader(s.getInputStream()),
+                                             10000);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the socket: "+host+":"+port);
+        }
+    }
+
     public void open ( FSDataInputStream fsin, long fstart, long fend ) {
         in_memory = false;
         this.fsin = fsin;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/Parser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Parser.java b/core/src/main/java/org/apache/mrql/Parser.java
index c1728f1..2817f50 100644
--- a/core/src/main/java/org/apache/mrql/Parser.java
+++ b/core/src/main/java/org/apache/mrql/Parser.java
@@ -27,6 +27,7 @@ interface Parser {
     public Tree type ();
     public void open ( String file );
     public void open ( FSDataInputStream fsin, long start, long end );
+    public void open ( String host, int port );
     public String slice ();
     public Bag parse ( String s );
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 85e4528..c127b39 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -389,6 +389,13 @@ final public class PlanGeneration extends AlgebraicOptimization {
        case call(stream,gen,`f,`len,`ulen):
            return #<SequenceStream(`(makePlan(f)),`(makePlan(len)),
                                    `(makePlan(ulen)))>;
+       case call(stream,`parser,`host,`port,...args):
+           if (!port.is_long())
+               fail;
+           Trees el = #[];
+           for ( Tree a: args )
+               el = el.append(makePlan(a));
+           return #<SocketStream(`parser,`(makePlan(host)),`port,...el)>;
        case call(stream,`parser,`file,...args):
            Trees el = #[];
            for ( Tree a: args )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 4f78c06..d12084c 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -621,6 +621,25 @@ public class TypeInference extends Translator {
             Tree tp = type_inference(#<call(source,binary,`f)>);
             ((Node)e).children = ((Node)e).children.append(tp);       // destructive
             return tp;
+        case call(stream,`parser,`f,...args):
+            if (Config.stream_window == 0)
+                type_error(e,"Not in stream processing mode");
+            if (!parser.is_variable())
+                type_error(e,"The parser must be a constant name: "+print_query(parser));
+            if (unify(type_inference(f),#<string>) == null)
+                type_error(e,"The source file must be a string: "+print_query(f));
+            if (unify(type_inference(args.head()),#<int>) != null)
+                args = args.tail();
+            try {
+                Class<? extends Parser> pc = DataSource.parserDirectory.get(parser.toString());
+                if (pc == null)
+                    type_error(e,"Unrecognized parser: "+parser);
+                Parser p = pc.newInstance();
+                p.initialize(args);
+                return #<Bag(`(p.type()))>;
+            } catch (Exception x) {
+                type_error(e,"Unrecognized parser type: "+parser);
+            }
         case call(stream,...r):
             if (Config.stream_window == 0)
                 type_error(e,"Not in stream processing mode");

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/XMLParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/XMLParser.java b/core/src/main/java/org/apache/mrql/XMLParser.java
index f55faba..54cf18e 100644
--- a/core/src/main/java/org/apache/mrql/XMLParser.java
+++ b/core/src/main/java/org/apache/mrql/XMLParser.java
@@ -83,6 +83,14 @@ public class XMLParser implements Parser {
         }
     }
 
+    public void open ( String host, int port ) {
+        try {
+            splitter = new XMLSplitter(tags,host,port,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
     public Tree type () { return new VariableLeaf("XML"); }
 
     public String slice () {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/core/src/main/java/org/apache/mrql/XMLSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/XMLSplitter.java b/core/src/main/java/org/apache/mrql/XMLSplitter.java
index e407d85..caa2469 100644
--- a/core/src/main/java/org/apache/mrql/XMLSplitter.java
+++ b/core/src/main/java/org/apache/mrql/XMLSplitter.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import javax.xml.parsers.SAXParserFactory;
 import org.xml.sax.*;
 import java.io.*;
+import java.net.Socket;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.fs.FSDataInputStream;
 
@@ -67,6 +68,19 @@ final public class XMLSplitter implements Iterator<DataOutputBuffer> {
         this.buffer = buffer;
     }
 
+    XMLSplitter ( String[] tags, String host, int port, DataOutputBuffer buffer ) {
+        in_memory = true;
+        try {
+            Socket s = new Socket(host,port);
+            in = new BufferedReader(new InputStreamReader(s.getInputStream()),
+                                    10000);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the socket: "+host+":"+port);
+        };
+        this.tags = tags;
+        this.buffer = buffer;
+    }
+
     public boolean hasNext () {
         try {
             if (in_memory || fsin.getPos() < end)

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 685f219..072b62c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
     <hadoop.version>1.2.1</hadoop.version>
     <yarn.version>2.2.0</yarn.version>
     <hama.version>0.6.4</hama.version>
-    <spark.version>1.2.0</spark.version>
+    <spark.version>1.3.1</spark.version>
     <scala.version>2.10</scala.version>
     <flink.version>0.8.1</flink.version>
     <skipTests>true</skipTests>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/queries/streaming2.mrql
----------------------------------------------------------------------
diff --git a/queries/streaming2.mrql b/queries/streaming2.mrql
new file mode 100644
index 0000000..42af091
--- /dev/null
+++ b/queries/streaming2.mrql
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+select (k,avg(p.Y))
+from p in stream(line,"localhost",9999,"|",type( <X:long,Y:long > ))
+group by k: p.X;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkSocketStream.java b/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
new file mode 100644
index 0000000..74cfab3
--- /dev/null
+++ b/spark/src/main/java/org/apache/mrql/SparkSocketStream.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.HashMap;
+import java.util.ArrayList;
+import scala.Option;
+import scala.Some;
+import scala.None;
+import scala.collection.immutable.List;
+import scala.collection.immutable.Nil$;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.streaming.*;
+import org.apache.spark.streaming.api.java.*;
+import org.apache.spark.streaming.dstream.*;
+
+
+/** A Spark InputDStream for a stream socket */
+public class SparkSocketStream extends JavaInputDStream<MRData> {
+    private final static scala.reflect.ClassTag<MRData> classtag = scala.reflect.ClassTag$.MODULE$.apply(MRData.class);
+
+    public static final class MRDataSocketStream extends InputDStream<MRData> {
+        private final String host;
+        private final int port;
+        private Parser parser;
+        private final JavaStreamingContext stream_context;
+
+        MRDataSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser_name, Trees args ) {
+            super(stream_context.ssc(),classtag);
+            this.host = host;
+            this.port = port;
+            this.stream_context = stream_context;
+            Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser_name);
+            if (parser_class == null)
+                throw new Error("Unknown parser: "+parser_name);
+            try {
+                parser = parser_class.newInstance();
+                parser.initialize(args);
+                parser.open(host,port);
+	    } catch (Exception ex) {
+		throw new Error("Cannot create the parser");
+	    }
+        }
+
+        @Override
+        public void start () {}
+
+        @Override
+        public void stop () {}
+
+        @Override
+        public Duration slideDuration () {
+            return new Duration(Config.stream_window);
+        }
+
+        @Override
+        public List dependencies () {
+            return Nil$.MODULE$;
+        }
+
+        @Override
+        public Option<RDD<MRData>> compute ( Time validTime ) {
+            long duration = validTime.milliseconds();
+            duration = Config.stream_window;
+            long ct = System.currentTimeMillis();
+            ArrayList<MRData> result = new ArrayList<MRData>();
+            while ( System.currentTimeMillis() - ct < duration ) {
+                String data = parser.slice();
+                for ( MRData x: parser.parse(data) )
+                    result.add(x);
+            };
+            return new Some<RDD<MRData>>(SparkEvaluator.spark_context.parallelize(result).rdd());
+        }
+    }
+
+    SparkSocketStream ( JavaStreamingContext stream_context, String host, int port, String parser, Trees args ) {
+        super(new MRDataSocketStream(stream_context,host,port,parser,args),classtag);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a8937531/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index f514a48..0663430 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -55,6 +55,10 @@ public class SparkStreaming extends SparkEvaluator {
 		throw new Error("Cannot dump source directory");
 	    };
             return new SparkFileInputStream(stream_context,path,false);
+        case SocketStream(`parser,`hostname,`port,...args):
+            String h = ((MR_string)evalE(hostname,env)).get();
+            int p = ((MR_int)evalE(port,env)).get();
+            return new SparkSocketStream(stream_context,h,p,parser.toString(),args);
         };
         throw new Error("Unknown stream source: "+print_query(source));
     }