You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:36 UTC
[18/18] git commit: MRQL-16: correct source files. ASF licenses,
and POMs for release
MRQL-16: correct source files. ASF licenses, and POMs for release
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/3de9e485
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/3de9e485
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/3de9e485
Branch: refs/heads/master
Commit: 3de9e4853a211fa3ee8e7470c9295d8b12240a73
Parents: 1de63a6
Author: fegaras <fe...@cse.uta.edu>
Authored: Fri Sep 6 15:56:43 2013 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Fri Sep 6 15:56:43 2013 -0500
----------------------------------------------------------------------
Makefile | 2 +-
build.xml | 1 +
pom.xml | 6 +-
queries/RMAT.mrql | 18 +
queries/dblp-pagerank.mrql | 20 +-
queries/kmeans.mrql | 18 +
queries/kmeans2.mrql | 18 +
queries/pagerank.mrql | 18 +
queries/pagerank2.mrql | 18 +
queries/points.mrql | 18 +
src/main/assembly/bin.xml | 1 +
src/main/java/BSP/BSPPlan.java | 840 +++---
src/main/java/BSP/BinaryInputFormat.java | 44 +-
src/main/java/BSP/Evaluator.gen | 234 +-
src/main/java/BSP/GeneratorInputFormat.java | 90 +-
src/main/java/BSP/MRQLFileInputFormat.java | 122 +-
src/main/java/BSP/MultipleBSPInput.java | 32 +-
src/main/java/BSP/ParsedInputFormat.java | 116 +-
src/main/java/MapReduce/BinaryInputFormat.java | 76 +-
.../java/MapReduce/CrossProductOperation.java | 348 +--
src/main/java/MapReduce/Evaluator.gen | 404 +--
.../java/MapReduce/GeneratorInputFormat.java | 118 +-
src/main/java/MapReduce/GroupByJoinPlan.java | 614 ++---
src/main/java/MapReduce/JoinOperation.java | 604 ++---
.../java/MapReduce/MRQLFileInputFormat.java | 16 +-
src/main/java/MapReduce/MapJoinOperation.java | 304 +--
src/main/java/MapReduce/MapOperation.java | 166 +-
src/main/java/MapReduce/MapReduceOperation.java | 406 +--
src/main/java/MapReduce/MapReducePlan.java | 120 +-
src/main/java/MapReduce/ParsedInputFormat.java | 164 +-
src/main/java/core/AlgebraicOptimization.gen | 488 ++--
src/main/java/core/BSPTranslator.gen | 1246 ++++-----
src/main/java/core/Bag.java | 648 ++---
src/main/java/core/BagIterator.java | 2 +-
src/main/java/core/BinaryDataSource.java | 6 +-
src/main/java/core/ClassImporter.java | 232 +-
src/main/java/core/Compiler.gen | 966 +++----
src/main/java/core/Config.java | 340 +--
src/main/java/core/DataSet.java | 48 +-
src/main/java/core/DataSource.java | 166 +-
src/main/java/core/Environment.java | 12 +-
src/main/java/core/GeneratorDataSource.java | 22 +-
src/main/java/core/Interpreter.gen | 1386 +++++-----
src/main/java/core/Inv.java | 20 +-
src/main/java/core/JsonParser.java | 82 +-
src/main/java/core/JsonSplitter.java | 146 +-
src/main/java/core/Lambda.java | 8 +-
src/main/java/core/LineParser.gen | 310 +--
src/main/java/core/MRContainer.java | 148 +-
src/main/java/core/MRData.java | 6 +-
src/main/java/core/MRQL.java | 32 +-
src/main/java/core/MR_bool.java | 24 +-
src/main/java/core/MR_byte.java | 22 +-
src/main/java/core/MR_char.java | 20 +-
src/main/java/core/MR_dataset.java | 16 +-
src/main/java/core/MR_double.java | 26 +-
src/main/java/core/MR_float.java | 26 +-
src/main/java/core/MR_int.java | 30 +-
src/main/java/core/MR_long.java | 32 +-
src/main/java/core/MR_more_bsp_steps.java | 4 +-
src/main/java/core/MR_short.java | 20 +-
src/main/java/core/MR_string.java | 28 +-
src/main/java/core/MR_sync.java | 2 +-
src/main/java/core/MR_variable.java | 2 +-
src/main/java/core/Main.java | 238 +-
src/main/java/core/MapReduceAlgebra.java | 1076 ++++----
src/main/java/core/Materialization.gen | 162 +-
src/main/java/core/MethodInfo.java | 38 +-
src/main/java/core/Normalization.gen | 664 ++---
src/main/java/core/ParsedDataSource.java | 46 +-
src/main/java/core/Plan.java | 484 ++--
src/main/java/core/PlanGeneration.gen | 1244 ++++-----
src/main/java/core/Printer.gen | 794 +++---
src/main/java/core/QueryPlan.gen | 1524 +++++------
src/main/java/core/Simplification.gen | 646 ++---
src/main/java/core/SystemFunctions.java | 404 +--
src/main/java/core/Test.java | 144 +-
src/main/java/core/TopLevel.gen | 430 +--
src/main/java/core/Translator.gen | 474 ++--
src/main/java/core/Tuple.java | 212 +-
src/main/java/core/TypeInference.gen | 2468 +++++++++---------
src/main/java/core/Union.java | 40 +-
src/main/java/core/XMLParser.java | 102 +-
src/main/java/core/XMLSplitter.java | 172 +-
src/main/java/core/XPathParser.java | 1170 ++++-----
src/main/java/core/mrql.cgen | 566 ++--
src/main/java/gen/Condition.java | 12 +-
src/main/java/gen/Crypt.java | 58 +-
src/main/java/gen/DoubleLeaf.java | 16 +-
src/main/java/gen/LongLeaf.java | 16 +-
src/main/java/gen/Main.java | 68 +-
src/main/java/gen/Meta.java | 362 +--
src/main/java/gen/Node.java | 46 +-
src/main/java/gen/StringLeaf.java | 12 +-
src/main/java/gen/SymbolTable.java | 188 +-
src/main/java/gen/Tree.java | 30 +-
src/main/java/gen/Trees.java | 174 +-
src/main/java/gen/VariableLeaf.java | 14 +-
src/main/java/spark/BinaryInputFormat.java | 34 +-
src/main/java/spark/Evaluator.gen | 1056 ++++----
src/main/java/spark/GeneratorInputFormat.java | 124 +-
src/main/java/spark/MRQLFileInputFormat.java | 76 +-
src/main/java/spark/MR_rdd.java | 8 +-
src/main/java/spark/ParsedInputFormat.java | 106 +-
src/main/java/spark/RDDDataSource.java | 6 +-
tests/queries/core_1.mrql | 18 +
tests/queries/distinct_1.mrql | 18 +
tests/queries/factorization_1.mrql | 18 +
tests/queries/group_by_1.mrql | 18 +
tests/queries/group_by_having_1.mrql | 18 +
tests/queries/group_by_order_by_1.mrql | 18 +
tests/queries/group_by_order_by_2.mrql | 18 +
tests/queries/join_group_by_1.mrql | 18 +
tests/queries/join_group_by_2.mrql | 18 +
tests/queries/join_group_by_3.mrql | 18 +
tests/queries/join_order_by_1.mrql | 18 +
tests/queries/joins_1.mrql | 18 +
tests/queries/kmeans_1.mrql | 18 +
tests/queries/matrix_1.mrql | 18 +
tests/queries/nested_select_1.mrql | 18 +
tests/queries/order_by_1.mrql | 18 +
tests/queries/pagerank_1.mrql | 18 +
tests/queries/relational_join_1.mrql | 18 +
tests/queries/total_aggregation_1.mrql | 18 +
tests/queries/total_aggregation_2.mrql | 18 +
tests/queries/udf_1.mrql | 18 +
tests/queries/user_aggregation_1.mrql | 18 +
tests/queries/xml_1.mrql | 18 +
tests/queries/xml_2.mrql | 18 +
129 files changed, 14010 insertions(+), 13448 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/Makefile
----------------------------------------------------------------------
diff --git a/Makefile b/Makefile
index 531eac8..767755b 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,7 @@ common: clean_build gen mrql_parser json_parser
clean_build:
@rm -rf classes tmp
- @mkdir -p classes tmp tests/results tests/results/mr-memory tests/results/bsp-memory tests/results/hadoop tests/results/bsp tests/results/spark
+ @mkdir -p lib classes tmp tests/results tests/results/mr-memory tests/results/bsp-memory tests/results/hadoop tests/results/bsp tests/results/spark
gen:
@${JFLEX} src/main/java/gen/gen.lex -d tmp
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 6482916..1cc4f78 100644
--- a/build.xml
+++ b/build.xml
@@ -124,6 +124,7 @@
description="Clean the build directories">
<delete dir="classes"/>
<delete dir="tmp"/>
+ <mkdir dir="lib"/>
<mkdir dir="classes"/>
<mkdir dir="tmp"/>
<mkdir dir="tests/results"/>
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 448bea0..b8709aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
<version>2.2</version>
<configuration>
<mavenExecutorId>forked-path</mavenExecutorId>
+ <autoVersionSubmodules>true</autoVersionSubmodules>
</configuration>
</plugin>
<plugin>
@@ -123,8 +124,9 @@
<exclude>.gitignore</exclude>
<exclude>.classpath/**</exclude>
<exclude>.project/**</exclude>
- <exclude>tests/**</exclude>
- <exclude>queries/**</exclude>
+ <exclude>tests/data/**</exclude>
+ <exclude>tests/results/**</exclude>
+ <exclude>tests/error_log.txt</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
<exclude>**/docs/apidocs/**</exclude>
</excludes>
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/RMAT.mrql
----------------------------------------------------------------------
diff --git a/queries/RMAT.mrql b/queries/RMAT.mrql
index 771737b..1fb1181 100644
--- a/queries/RMAT.mrql
+++ b/queries/RMAT.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// Kronecker graph generator parameters:
a = 0.57;
b = 0.19;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/dblp-pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/dblp-pagerank.mrql b/queries/dblp-pagerank.mrql
index 5cbd672..cd6ae85 100644
--- a/queries/dblp-pagerank.mrql
+++ b/queries/dblp-pagerank.mrql
@@ -1,4 +1,22 @@
-DBLP = source(xml,"/home/fegaras/data/dblp2.xml",{"article","incollection","book","inproceedings"});
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DBLP = source(xml,args[0],{"article","incollection","book","inproceedings"});
graph = select (key,select text(x) from x in c)
from a in DBLP,
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/kmeans.mrql
----------------------------------------------------------------------
diff --git a/queries/kmeans.mrql b/queries/kmeans.mrql
index 3934f0c..dd85fd4 100644
--- a/queries/kmeans.mrql
+++ b/queries/kmeans.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
type point = < X: double, Y: double >;
function distance ( x: point, y: point ): double {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/kmeans2.mrql
----------------------------------------------------------------------
diff --git a/queries/kmeans2.mrql b/queries/kmeans2.mrql
index d34f86c..306a6f6 100644
--- a/queries/kmeans2.mrql
+++ b/queries/kmeans2.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
type point = < X: double, Y: double, Z: double >;
function distance ( x: point, y: point ): double {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/pagerank.mrql b/queries/pagerank.mrql
index ce4e46d..ba1707c 100644
--- a/queries/pagerank.mrql
+++ b/queries/pagerank.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// preprocessing: for each node, group its outgoing links into a bag
graph = select (key,n#1)
from n in source(binary,"graph.bin")
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/pagerank2.mrql
----------------------------------------------------------------------
diff --git a/queries/pagerank2.mrql b/queries/pagerank2.mrql
index d28a6fd..9f38aad 100644
--- a/queries/pagerank2.mrql
+++ b/queries/pagerank2.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// preprocessing: for each node, group its outgoing links into a bag
graph = select (key,select x.to from x in n)
from n in source(line,"queries/links.txt",",",type(<id:string,to:string>))
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/points.mrql
----------------------------------------------------------------------
diff --git a/queries/points.mrql b/queries/points.mrql
index b29c81d..155623e 100644
--- a/queries/points.mrql
+++ b/queries/points.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// generate random points in the 4 squares: [2..4,6..8]*[2..4,6..8]
// used in kmeans.mrql
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/bin.xml b/src/main/assembly/bin.xml
index aba99d9..0a0192a 100644
--- a/src/main/assembly/bin.xml
+++ b/src/main/assembly/bin.xml
@@ -31,6 +31,7 @@
<include>LICENSE*</include>
<include>NOTICE*</include>
<include>conf/mrql-env.sh</include>
+ <include>**/apidocs/**</include>
</includes>
<excludes>
<exclude>**/*~</exclude>
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPPlan.java b/src/main/java/BSP/BSPPlan.java
index 4ccd95d..0ad30e9 100644
--- a/src/main/java/BSP/BSPPlan.java
+++ b/src/main/java/BSP/BSPPlan.java
@@ -32,364 +32,364 @@ import org.apache.hadoop.conf.Configuration;
final public class BSPPlan extends Plan {
final static Configuration getConfiguration ( BSPJob job ) {
- return job.getConf(); // use job.getConfiguration() for Hama 0.6.0
+ return job.getConf(); // use job.getConfiguration() for Hama 0.6.0
}
/** The BSP evaluator */
final static class BSPop extends BSP<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> {
- final static MRContainer null_key = new MRContainer(new MR_byte(0));
- // special message for sub-sync()
- final static MRData more_to_come = new MR_sync();
- final static MRData more_supersteps = new MR_more_bsp_steps();
+ final static MRContainer null_key = new MRContainer(new MR_byte(0));
+ // special message for sub-sync()
+ final static MRData more_to_come = new MR_sync();
+ final static MRData more_supersteps = new MR_more_bsp_steps();
- private int source_num;
- private Function superstep_fnc; // superstep function
- private MRData state; // BSP state
- private boolean orderp; // will output be ordered?
- private MRData source; // BSP input
- private Function acc_fnc; // aggregator
- private MRData acc_result; // aggregation result
- private static String[] all_peer_names; // all BSP peer names
- private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers; // all BSP peers
- // a master peer that coordinates and collects results of partial aggregations
- private String masterTask;
- // buffer for received messages -- regularly in a vector, but can be spilled in a local file
- Bag msg_cache;
- // the cache that holds all local data in memory
- Tuple local_cache;
+ private int source_num;
+ private Function superstep_fnc; // superstep function
+ private MRData state; // BSP state
+ private boolean orderp; // will output be ordered?
+ private MRData source; // BSP input
+ private Function acc_fnc; // aggregator
+ private MRData acc_result; // aggregation result
+ private static String[] all_peer_names; // all BSP peer names
+ private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers; // all BSP peers
+ // a master peer that coordinates and collects results of partial aggregations
+ private String masterTask;
+ // buffer for received messages -- regularly in a vector, but can be spilled in a local file
+ Bag msg_cache;
+ // the cache that holds all local data in memory
+ Tuple local_cache;
- private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
- for ( int i = 0; i < all_peer_names.length; i++ )
- if (all_peer_names[i].equals(name))
- return all_peers[i];
- throw new Error("Unknown peer: "+name);
- }
+ private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
+ for ( int i = 0; i < all_peer_names.length; i++ )
+ if (all_peer_names[i].equals(name))
+ return all_peers[i];
+ throw new Error("Unknown peer: "+name);
+ }
- private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
- String name = peer.getPeerName();
- for ( int i = 0; i < all_peer_names.length; i++ )
- if (all_peer_names[i].equals(name))
- all_peers[i] = peer;
- }
+ private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
+ String name = peer.getPeerName();
+ for ( int i = 0; i < all_peer_names.length; i++ )
+ if (all_peer_names[i].equals(name))
+ all_peers[i] = peer;
+ }
- /** shuffle values to BSP peers based on uniform hashing on key */
- private static String shuffle ( MRData key ) {
- return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
- }
+ /** shuffle values to BSP peers based on uniform hashing on key */
+ private static String shuffle ( MRData key ) {
+ return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
+ }
- /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
- public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
- return synchronize(getPeer(peerName.get()),mr_exit);
- }
+ /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
+ public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+ return synchronize(getPeer(peerName.get()),mr_exit);
+ }
- /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
- public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
- if (!Config.hadoop_mode)
- return mr_exit;
- // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
- if (mr_exit == SystemFunctions.bsp_true_value // must be ==, not equals
- || mr_exit == SystemFunctions.bsp_false_value)
- return mr_exit;
- try {
- // this case is only used for checking the exit condition of repeat/closure
- boolean exit = mr_exit.get();
- if (all_peer_names.length <= 1)
- return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
- if (!exit)
- // this peer is not ready to exit, so no peer should exit
- for ( String p: peer.getAllPeerNames() )
- peer.send(p,new MRContainer(more_supersteps));
- peer.sync();
- // now exit is true if no peer sent a "more_supersteps" message
- exit = peer.getNumCurrentMessages() == 0;
- peer.clear();
- return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
+ /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
+ public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
+ if (!Config.hadoop_mode)
+ return mr_exit;
+ // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
+ if (mr_exit == SystemFunctions.bsp_true_value // must be ==, not equals
+ || mr_exit == SystemFunctions.bsp_false_value)
+ return mr_exit;
+ try {
+ // this case is only used for checking the exit condition of repeat/closure
+ boolean exit = mr_exit.get();
+ if (all_peer_names.length <= 1)
+ return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+ if (!exit)
+ // this peer is not ready to exit, so no peer should exit
+ for ( String p: peer.getAllPeerNames() )
+ peer.send(p,new MRContainer(more_supersteps));
+ peer.sync();
+ // now exit is true if no peer sent a "more_supersteps" message
+ exit = peer.getNumCurrentMessages() == 0;
+ peer.clear();
+ return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
- /** collect a bag from all peers by distributing the local copy s */
- public static Bag distribute ( MR_string peerName, Bag s ) {
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
- if (!Config.hadoop_mode)
- return s;
- try {
- for ( MRData e: s )
- for ( String p: all_peer_names )
- peer.send(p,new MRContainer(e));
- peer.sync();
- MRContainer msg;
- Bag res = new Bag();
- while ((msg = peer.getCurrentMessage()) != null)
- if (!res.contains(msg.data()))
- res.add(msg.data());
- peer.clear();
- return res;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
+ /** collect a bag from all peers by distributing the local copy s */
+ public static Bag distribute ( MR_string peerName, Bag s ) {
+ BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
+ if (!Config.hadoop_mode)
+ return s;
+ try {
+ for ( MRData e: s )
+ for ( String p: all_peer_names )
+ peer.send(p,new MRContainer(e));
+ peer.sync();
+ MRContainer msg;
+ Bag res = new Bag();
+ while ((msg = peer.getCurrentMessage()) != null)
+ if (!res.contains(msg.data()))
+ res.add(msg.data());
+ peer.clear();
+ return res;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
- private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException {
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- while (peer.readNext(key,value)) {
- Tuple p = (Tuple)(value.data());
- ((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
- }
- }
+ private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+ throws IOException {
+ MRContainer key = new MRContainer();
+ MRContainer value = new MRContainer();
+ while (peer.readNext(key,value)) {
+ Tuple p = (Tuple)(value.data());
+ ((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
+ }
+ }
- private void writeLocalResult ( Bag result,
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException {
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- for ( MRData v: result )
- if (orderp) { // prepare for sorting
- Tuple t = (Tuple)v;
- key.set(t.get(1));
- value.set(t.get(0));
- peer.write(key,value);
- } else {
- value.set(v);
- peer.write(null_key,value);
- }
- }
+ private void writeLocalResult ( Bag result,
+ BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+ throws IOException {
+ MRContainer key = new MRContainer();
+ MRContainer value = new MRContainer();
+ for ( MRData v: result )
+ if (orderp) { // prepare for sorting
+ Tuple t = (Tuple)v;
+ key.set(t.get(1));
+ value.set(t.get(0));
+ peer.write(key,value);
+ } else {
+ value.set(v);
+ peer.write(null_key,value);
+ }
+ }
- /** receive messages from other peers */
- private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- if (Config.bsp_msg_size <= 0) { // no buffering
- msg_cache = new Bag(new BagIterator() {
- MRContainer msg;
- public boolean hasNext () {
- try {
- return (msg = peer.getCurrentMessage()) != null;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
- public MRData next () {
- return msg.data();
- }
- });
- } else {
- boolean expect_more = false; // are we expecting more incoming messages?
- do {
- // just in case this peer did a regular-sync() before the others did a sub-sync()
- expect_more = false;
- MRContainer msg;
- // cache the received messages
- while ((msg = peer.getCurrentMessage()) != null)
- // if at least one peer sends a more_to_come message, then expect_more
- if (msg.data().equals(more_to_come))
- expect_more = true;
- else msg_cache.add(msg.data());
- if (expect_more)
- peer.sync(); // sub-sync()
- } while (expect_more);
- }
- }
+ /** receive messages from other peers */
+ private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+ throws IOException, SyncException, InterruptedException {
+ if (Config.bsp_msg_size <= 0) { // no buffering
+ msg_cache = new Bag(new BagIterator() {
+ MRContainer msg;
+ public boolean hasNext () {
+ try {
+ return (msg = peer.getCurrentMessage()) != null;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+ public MRData next () {
+ return msg.data();
+ }
+ });
+ } else {
+ boolean expect_more = false; // are we expecting more incoming messages?
+ do {
+ // just in case this peer did a regular-sync() before the others did a sub-sync()
+ expect_more = false;
+ MRContainer msg;
+ // cache the received messages
+ while ((msg = peer.getCurrentMessage()) != null)
+ // if at least one peer sends a more_to_come message, then expect_more
+ if (msg.data().equals(more_to_come))
+ expect_more = true;
+ else msg_cache.add(msg.data());
+ if (expect_more)
+ peer.sync(); // sub-sync()
+ } while (expect_more);
+ }
+ }
- /** send the messages produced by a superstep to peers and then receive the replies */
- private void send_messages ( Bag msgs,
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- int size = 0;
- if (Config.bsp_msg_size > 0)
- msg_cache.clear();
- for ( MRData m: msgs ) {
- Tuple t = (Tuple)m;
- // if there are too many messages to send, then sub-sync()
- if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
- // tell all peers that there is more to come after sync
- for ( String p: all_peer_names )
- if (!peer.getPeerName().equals(p))
- peer.send(p,new MRContainer(more_to_come));
- peer.sync(); // sub-sync()
- size = 0;
- MRContainer msg;
- // cache the received messages
- while ((msg = peer.getCurrentMessage()) != null)
- if (!msg.data().equals(more_to_come))
- msg_cache.add(msg.data());
- };
- // suffle messages based on key; needs new MRContainer object
- peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
- };
- peer.sync(); // regular-sync()
- receive_messages(peer);
- }
+ /** send the messages produced by a superstep to peers and then receive the replies */
+ private void send_messages ( Bag msgs,
+ BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+ throws IOException, SyncException, InterruptedException {
+ int size = 0;
+ if (Config.bsp_msg_size > 0)
+ msg_cache.clear();
+ for ( MRData m: msgs ) {
+ Tuple t = (Tuple)m;
+ // if there are too many messages to send, then sub-sync()
+ if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
+ // tell all peers that there is more to come after sync
+ for ( String p: all_peer_names )
+ if (!peer.getPeerName().equals(p))
+ peer.send(p,new MRContainer(more_to_come));
+ peer.sync(); // sub-sync()
+ size = 0;
+ MRContainer msg;
+ // cache the received messages
+ while ((msg = peer.getCurrentMessage()) != null)
+ if (!msg.data().equals(more_to_come))
+ msg_cache.add(msg.data());
+ };
+ // suffle messages based on key; needs new MRContainer object
+ peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
+ };
+ peer.sync(); // regular-sync()
+ receive_messages(peer);
+ }
- @Override
- public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- final Tuple stepin = new Tuple(4);
- stepin.set(3,new MR_string(peer.getPeerName()));
- Tuple result;
- boolean skip = false;
- String tabs = "";
- int step = 0;
- boolean exit;
- readLocalSources(peer);
- setPeer(peer);
- do {
- if (!skip)
- step++;
- if (!skip && Config.trace_execution) {
- tabs = Interpreter.tabs(Interpreter.tab_count);
- System.err.println(tabs+" Superstep "+step+" ["+peer.getPeerName()+"]:");
- System.err.println(tabs+" messages ["+peer.getPeerName()+"]: "+msg_cache);
- System.err.println(tabs+" state ["+peer.getPeerName()+"]: "+state);
- for ( int i = 0; i < local_cache.size(); i++)
- if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
- System.out.println(tabs+" cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
- };
- stepin.set(0,local_cache);
- stepin.set(1,msg_cache);
- stepin.set(2,state);
- // evaluate one superstep
- result = (Tuple)superstep_fnc.eval(stepin);
- Bag msgs = (Bag)result.get(0);
- exit = ((MR_bool)result.get(2)).get();
- state = result.get(1);
- // shortcuts: if we know for sure that all peers want to exit/continue
- if (result.get(2) == SystemFunctions.bsp_true_value) { // must be ==, not equals
- peer.sync();
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- break;
- };
- if (result.get(2) == SystemFunctions.bsp_false_value) {
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- send_messages(msgs,peer);
- skip = false;
- continue;
- };
- // shortcut: skip is true when NONE of the peers sent any messages
- skip = (msgs == SystemFunctions.bsp_empty_bag); // must be ==, not equals
- if (skip)
- continue;
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- exit = synchronize(peer,(MR_bool)result.get(2)).get();
- send_messages(msgs,peer);
- } while (!exit);
- if (acc_result == null) {
- // the BSP result is a bag that needs to be dumped to the HDFS
- writeLocalResult((Bag)(local_cache.get(source_num)),peer);
- // if there more results, dump them to HDFS
- final MR_long key = new MR_long(0);
- final MRContainer key_container = new MRContainer(key);
- final MRContainer data_container = new MRContainer(new MR_int(0));
- int loc = 0;
- while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
- loc++;
- Configuration conf = peer.getConfiguration();
- String[] out_paths = conf.get("mrql.output.paths").split(",");
- for ( int i = 1; i < out_paths.length; i++ ) {
- String[] s = out_paths[i].split(":");
- int out_num = Integer.parseInt(s[0]);
- Path path = new Path(s[1]+"/peer"+loc);
- FileSystem fs = path.getFileSystem(conf);
- SequenceFile.Writer writer
- = new SequenceFile.Writer(fs,conf,path,
- MRContainer.class,MRContainer.class);
- long count = 0;
- for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
- key.set(count++);
- data_container.set(e);
- writer.append(key_container,data_container);
- };
- writer.close();
- };
- } else {
- // the BSP result is an aggregation:
- // send the partial results to the master peer
- peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
- peer.sync();
- if (peer.getPeerName().equals(masterTask)) {
- // only the master peer collects the partial aggregations
- MRContainer msg;
- while ((msg = peer.getCurrentMessage()) != null)
- acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
- // write the final aggregation result
- peer.write(null_key,new MRContainer(acc_result));
- }
- }
- }
+ @Override
+ public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+ throws IOException, SyncException, InterruptedException {
+ final Tuple stepin = new Tuple(4);
+ stepin.set(3,new MR_string(peer.getPeerName()));
+ Tuple result;
+ boolean skip = false;
+ String tabs = "";
+ int step = 0;
+ boolean exit;
+ readLocalSources(peer);
+ setPeer(peer);
+ do {
+ if (!skip)
+ step++;
+ if (!skip && Config.trace_execution) {
+ tabs = Interpreter.tabs(Interpreter.tab_count);
+ System.err.println(tabs+" Superstep "+step+" ["+peer.getPeerName()+"]:");
+ System.err.println(tabs+" messages ["+peer.getPeerName()+"]: "+msg_cache);
+ System.err.println(tabs+" state ["+peer.getPeerName()+"]: "+state);
+ for ( int i = 0; i < local_cache.size(); i++)
+ if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
+ System.out.println(tabs+" cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
+ };
+ stepin.set(0,local_cache);
+ stepin.set(1,msg_cache);
+ stepin.set(2,state);
+ // evaluate one superstep
+ result = (Tuple)superstep_fnc.eval(stepin);
+ Bag msgs = (Bag)result.get(0);
+ exit = ((MR_bool)result.get(2)).get();
+ state = result.get(1);
+ // shortcuts: if we know for sure that all peers want to exit/continue
+ if (result.get(2) == SystemFunctions.bsp_true_value) { // must be ==, not equals
+ peer.sync();
+ if (Config.trace_execution)
+ System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
+ break;
+ };
+ if (result.get(2) == SystemFunctions.bsp_false_value) {
+ if (Config.trace_execution)
+ System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
+ send_messages(msgs,peer);
+ skip = false;
+ continue;
+ };
+ // shortcut: skip is true when NONE of the peers sent any messages
+ skip = (msgs == SystemFunctions.bsp_empty_bag); // must be ==, not equals
+ if (skip)
+ continue;
+ if (Config.trace_execution)
+ System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
+ exit = synchronize(peer,(MR_bool)result.get(2)).get();
+ send_messages(msgs,peer);
+ } while (!exit);
+ if (acc_result == null) {
+ // the BSP result is a bag that needs to be dumped to the HDFS
+ writeLocalResult((Bag)(local_cache.get(source_num)),peer);
+ // if there more results, dump them to HDFS
+ final MR_long key = new MR_long(0);
+ final MRContainer key_container = new MRContainer(key);
+ final MRContainer data_container = new MRContainer(new MR_int(0));
+ int loc = 0;
+ while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
+ loc++;
+ Configuration conf = peer.getConfiguration();
+ String[] out_paths = conf.get("mrql.output.paths").split(",");
+ for ( int i = 1; i < out_paths.length; i++ ) {
+ String[] s = out_paths[i].split(":");
+ int out_num = Integer.parseInt(s[0]);
+ Path path = new Path(s[1]+"/peer"+loc);
+ FileSystem fs = path.getFileSystem(conf);
+ SequenceFile.Writer writer
+ = new SequenceFile.Writer(fs,conf,path,
+ MRContainer.class,MRContainer.class);
+ long count = 0;
+ for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
+ key.set(count++);
+ data_container.set(e);
+ writer.append(key_container,data_container);
+ };
+ writer.close();
+ };
+ } else {
+ // the BSP result is an aggregation:
+ // send the partial results to the master peer
+ peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
+ peer.sync();
+ if (peer.getPeerName().equals(masterTask)) {
+ // only the master peer collects the partial aggregations
+ MRContainer msg;
+ while ((msg = peer.getCurrentMessage()) != null)
+ acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
+ // write the final aggregation result
+ peer.write(null_key,new MRContainer(acc_result));
+ }
+ }
+ }
- @Override
- @SuppressWarnings("unchecked")
- public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
- try {
- super.setup(peer);
- Configuration conf = peer.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- all_peer_names = peer.getAllPeerNames();
- all_peers = new BSPPeerImpl[all_peer_names.length];
- Arrays.sort(all_peer_names); // is this necessary?
- source_num = conf.getInt("mrql.output.tag",0);
- Tree code = Tree.parse(conf.get("mrql.superstep"));
- superstep_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.initial.state"));
- state = Interpreter.evalE(code);
- if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
- code = Tree.parse(conf.get("mrql.zero"));
- acc_result = Interpreter.evalE(code);
- code = Tree.parse(conf.get("mrql.accumulator"));
- acc_fnc = functional_argument(conf,code);
- } else acc_result = null;
- orderp = conf.getBoolean("mrql.orderp",false);
- masterTask = all_peer_names[peer.getNumPeers()/2];
- msg_cache = new Bag(1000);
- local_cache = new Tuple(max_input_files);
- for ( int i = 0; i < max_input_files; i++ )
- local_cache.set(i,new Bag());
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Cannot setup the Hama BSP job: "+e);
- }
- }
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
+ try {
+ super.setup(peer);
+ Configuration conf = peer.getConfiguration();
+ Config.read(conf);
+ if (Plan.conf == null)
+ Plan.conf = conf;
+ all_peer_names = peer.getAllPeerNames();
+ all_peers = new BSPPeerImpl[all_peer_names.length];
+ Arrays.sort(all_peer_names); // is this necessary?
+ source_num = conf.getInt("mrql.output.tag",0);
+ Tree code = Tree.parse(conf.get("mrql.superstep"));
+ superstep_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.initial.state"));
+ state = Interpreter.evalE(code);
+ if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
+ code = Tree.parse(conf.get("mrql.zero"));
+ acc_result = Interpreter.evalE(code);
+ code = Tree.parse(conf.get("mrql.accumulator"));
+ acc_fnc = functional_argument(conf,code);
+ } else acc_result = null;
+ orderp = conf.getBoolean("mrql.orderp",false);
+ masterTask = all_peer_names[peer.getNumPeers()/2];
+ msg_cache = new Bag(1000);
+ local_cache = new Tuple(max_input_files);
+ for ( int i = 0; i < max_input_files; i++ )
+ local_cache.set(i,new Bag());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Cannot setup the Hama BSP job: "+e);
+ }
+ }
- @Override
- public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
- if (!Config.local_mode)
- clean();
- local_cache = null;
- super.cleanup(peer);
- }
+ @Override
+ public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
+ if (!Config.local_mode)
+ clean();
+ local_cache = null;
+ super.cleanup(peer);
+ }
}
/** set Hama's min split size and number of BSP tasks (doesn't work with Hama 0.6.0) */
public static void setupSplits ( BSPJob job, DataSet ds ) throws IOException {
- long[] sizes = new long[ds.source.size()];
- if (sizes.length > Config.nodes)
- throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
- for ( int i = 0; i < sizes.length; i++ )
- sizes[i] = ds.source.get(i).size(Plan.conf);
- long total_size = 0;
- for ( long size: sizes )
- total_size += size;
- long split_size = Math.max(total_size/Config.nodes,100000);
- int tasks = 0;
- do { // adjust split_size
- tasks = 0;
- for ( long size: sizes )
- tasks += (int)Math.ceil(size/(double)split_size);
- if (tasks > Config.nodes)
- split_size = (long)Math.ceil((double)split_size*1.01);
- } while (tasks > Config.nodes);
- job.setNumBspTask(tasks);
- System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
- +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
- +" bytes of input data.");
- job.set("bsp.min.split.size",Long.toString(split_size));
+ long[] sizes = new long[ds.source.size()];
+ if (sizes.length > Config.nodes)
+ throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
+ for ( int i = 0; i < sizes.length; i++ )
+ sizes[i] = ds.source.get(i).size(Plan.conf);
+ long total_size = 0;
+ for ( long size: sizes )
+ total_size += size;
+ long split_size = Math.max(total_size/Config.nodes,100000);
+ int tasks = 0;
+ do { // adjust split_size
+ tasks = 0;
+ for ( long size: sizes )
+ tasks += (int)Math.ceil(size/(double)split_size);
+ if (tasks > Config.nodes)
+ split_size = (long)Math.ceil((double)split_size*1.01);
+ } while (tasks > Config.nodes);
+ job.setNumBspTask(tasks);
+ System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
+ +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
+ +" bytes of input data.");
+ job.set("bsp.min.split.size",Long.toString(split_size));
}
/** Evaluate a BSP operation that returns a DataSet
@@ -401,52 +401,52 @@ final public class BSPPlan extends Plan {
* @return a new data source that contains the result
*/
public final static MRData BSP ( int[] source_nums, // output tags
- Tree superstep, // superstep function
- Tree init_state, // initial state
- boolean orderp, // do we need to order the result?
- DataSet source // input dataset
- ) throws Exception {
- String[] newpaths = new String[source_nums.length];
- newpaths[0] = new_path(conf);
- conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
- for ( int i = 1; i < source_nums.length; i++ ) {
- newpaths[i] = new_path(conf);
- Path path = new Path(newpaths[1]);
- FileSystem fs = path.getFileSystem(conf);
- fs.mkdirs(path);
- conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
- };
- conf.set("mrql.superstep",superstep.toString());
- conf.set("mrql.initial.state",init_state.toString());
- conf.set("mrql.zero","");
- conf.setInt("mrql.output.tag",source_nums[0]);
- conf.setBoolean("mrql.orderp",orderp);
- BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
- setupSplits(job,source);
- job.setJobName(newpaths[0]);
- distribute_compiled_arguments(getConfiguration(job));
- job.setBspClass(BSPop.class);
- Path outpath = new Path(newpaths[0]);
- job.setOutputPath(outpath);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setInputFormat(MultipleBSPInput.class);
- FileInputFormat.setInputPaths(job,source.merge());
- job.waitForCompletion(true);
- if (source_nums.length == 1) {
- BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
- ds.to_be_merged = orderp;
- return new MR_dataset(new DataSet(ds,0,3));
- } else {
- MRData[] s = new MRData[source_nums.length];
- for ( int i = 0; i < source_nums.length; i++ ) {
- BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
- ds.to_be_merged = orderp;
- s[i] = new MR_dataset(new DataSet(ds,0,3));
- };
- return new Tuple(s);
- }
+ Tree superstep, // superstep function
+ Tree init_state, // initial state
+ boolean orderp, // do we need to order the result?
+ DataSet source // input dataset
+ ) throws Exception {
+ String[] newpaths = new String[source_nums.length];
+ newpaths[0] = new_path(conf);
+ conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
+ for ( int i = 1; i < source_nums.length; i++ ) {
+ newpaths[i] = new_path(conf);
+ Path path = new Path(newpaths[1]);
+ FileSystem fs = path.getFileSystem(conf);
+ fs.mkdirs(path);
+ conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
+ };
+ conf.set("mrql.superstep",superstep.toString());
+ conf.set("mrql.initial.state",init_state.toString());
+ conf.set("mrql.zero","");
+ conf.setInt("mrql.output.tag",source_nums[0]);
+ conf.setBoolean("mrql.orderp",orderp);
+ BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
+ setupSplits(job,source);
+ job.setJobName(newpaths[0]);
+ distribute_compiled_arguments(getConfiguration(job));
+ job.setBspClass(BSPop.class);
+ Path outpath = new Path(newpaths[0]);
+ job.setOutputPath(outpath);
+ job.setOutputKeyClass(MRContainer.class);
+ job.setOutputValueClass(MRContainer.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setInputFormat(MultipleBSPInput.class);
+ FileInputFormat.setInputPaths(job,source.merge());
+ job.waitForCompletion(true);
+ if (source_nums.length == 1) {
+ BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
+ ds.to_be_merged = orderp;
+ return new MR_dataset(new DataSet(ds,0,3));
+ } else {
+ MRData[] s = new MRData[source_nums.length];
+ for ( int i = 0; i < source_nums.length; i++ ) {
+ BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
+ ds.to_be_merged = orderp;
+ s[i] = new MR_dataset(new DataSet(ds,0,3));
+ };
+ return new Tuple(s);
+ }
}
/** Evaluate a BSP operation that aggregates the results
@@ -459,43 +459,43 @@ final public class BSPPlan extends Plan {
* @return the aggregation result
*/
public final static MRData BSPaggregate ( int source_num, // output tag
- Tree superstep, // superstep function
- Tree init_state, // initial state
- Tree acc_fnc, // accumulator function
- Tree zero, // zero value for the accumulator
- DataSet source // input dataset
- ) throws Exception {
- String newpath = new_path(conf);
- conf.set("mrql.superstep",superstep.toString());
- conf.set("mrql.initial.state",init_state.toString());
- conf.set("mrql.accumulator",acc_fnc.toString());
- conf.set("mrql.zero",zero.toString());
- conf.setInt("mrql.output.tag",source_num);
- conf.setBoolean("mrql.orderp",false);
- BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
- setupSplits(job,source);
- job.setJobName(newpath);
- distribute_compiled_arguments(getConfiguration(job));
- job.setBspClass(BSPop.class);
- Path outpath = new Path(newpath);
- job.setOutputPath(outpath);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setInputFormat(MultipleBSPInput.class);
- FileInputFormat.setInputPaths(job,source.merge());
- job.waitForCompletion(true);
- FileSystem fs = outpath.getFileSystem(conf);
- FileStatus[] files = fs.listStatus(outpath);
- for ( int i = 0; i < files.length; i++ ) {
- SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- sreader.next(key,value);
- sreader.close();
- if (value.data() != null)
- return value.data();
- };
- return null;
+ Tree superstep, // superstep function
+ Tree init_state, // initial state
+ Tree acc_fnc, // accumulator function
+ Tree zero, // zero value for the accumulator
+ DataSet source // input dataset
+ ) throws Exception {
+ String newpath = new_path(conf);
+ conf.set("mrql.superstep",superstep.toString());
+ conf.set("mrql.initial.state",init_state.toString());
+ conf.set("mrql.accumulator",acc_fnc.toString());
+ conf.set("mrql.zero",zero.toString());
+ conf.setInt("mrql.output.tag",source_num);
+ conf.setBoolean("mrql.orderp",false);
+ BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
+ setupSplits(job,source);
+ job.setJobName(newpath);
+ distribute_compiled_arguments(getConfiguration(job));
+ job.setBspClass(BSPop.class);
+ Path outpath = new Path(newpath);
+ job.setOutputPath(outpath);
+ job.setOutputKeyClass(MRContainer.class);
+ job.setOutputValueClass(MRContainer.class);
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setInputFormat(MultipleBSPInput.class);
+ FileInputFormat.setInputPaths(job,source.merge());
+ job.waitForCompletion(true);
+ FileSystem fs = outpath.getFileSystem(conf);
+ FileStatus[] files = fs.listStatus(outpath);
+ for ( int i = 0; i < files.length; i++ ) {
+ SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
+ MRContainer key = new MRContainer();
+ MRContainer value = new MRContainer();
+ sreader.next(key,value);
+ sreader.close();
+ if (value.data() != null)
+ return value.data();
+ };
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BinaryInputFormat.java b/src/main/java/BSP/BinaryInputFormat.java
index 27f62ee..7f2229a 100644
--- a/src/main/java/BSP/BinaryInputFormat.java
+++ b/src/main/java/BSP/BinaryInputFormat.java
@@ -29,32 +29,32 @@ import org.apache.hama.HamaConfiguration;
/** Input format for hadoop sequence files */
final public class BinaryInputFormat extends MRQLFileInputFormat {
public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
- final MRContainer result = new MRContainer();
- final MRData source_num_data;
- final int source_number;
+ final MRContainer result = new MRContainer();
+ final MRData source_num_data;
+ final int source_number;
- public BinaryInputRecordReader ( FileSplit split,
- BSPJob job,
- int source_number ) throws IOException {
- super(BSPPlan.getConfiguration(job),split);
- this.source_number = source_number;
- source_num_data = new MR_int(source_number);
- }
+ public BinaryInputRecordReader ( FileSplit split,
+ BSPJob job,
+ int source_number ) throws IOException {
+ super(BSPPlan.getConfiguration(job),split);
+ this.source_number = source_number;
+ source_num_data = new MR_int(source_number);
+ }
- @Override
- public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
- boolean b = super.next(key,result);
- value.set(new Tuple(source_num_data,result.data()));
- return b;
- }
+ @Override
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ boolean b = super.next(key,result);
+ value.set(new Tuple(source_num_data,result.data()));
+ return b;
+ }
}
public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split,
- BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- String path = ((FileSplit)split).getPath().toString();
- BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,conf);
- return new BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
+ getRecordReader ( InputSplit split,
+ BSPJob job ) throws IOException {
+ Configuration conf = BSPPlan.getConfiguration(job);
+ String path = ((FileSplit)split).getPath().toString();
+ BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,conf);
+ return new BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/Evaluator.gen b/src/main/java/BSP/Evaluator.gen
index 5331ab0..c0aabb6 100644
--- a/src/main/java/BSP/Evaluator.gen
+++ b/src/main/java/BSP/Evaluator.gen
@@ -29,8 +29,8 @@ final public class Evaluator extends Interpreter {
/** initialize the BSP evaluator */
final static void init ( Configuration conf ) {
- Config.bsp_mode = true;
- if (Config.hadoop_mode)
+ Config.bsp_mode = true;
+ if (Config.hadoop_mode)
if (Config.local_mode) {
conf.set("bsp.master.address","local");
conf.set("hama.zookeeper.quorum","localhost");
@@ -52,7 +52,7 @@ final public class Evaluator extends Interpreter {
}
final static Configuration new_configuration () {
- return new HamaConfiguration();
+ return new HamaConfiguration();
}
public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
@@ -73,30 +73,30 @@ final public class Evaluator extends Interpreter {
* @return a DataSet (stored in HDFS)
*/
final static MRData bsp ( Tree plan, Environment env ) throws Exception {
- match plan {
- case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
- int[] os = new int[ns.length()];
- for ( int i = 0; i < os.length; i++ )
- os[i] = (int)((LongLeaf)ns.nth(i)).value();
- DataSet ds = eval(S.head(),env);
- for ( Tree s: S.tail() )
- ds.merge(eval(s,env));
- return BSPPlan.BSP(os,
- closure(superstep,env),
- init_state,
- o.equals(#<true>),
- ds);
- case BSP(`n,`superstep,`init_state,`o,...S):
- DataSet ds = eval(S.head(),env);
- for ( Tree s: S.tail() )
- ds.merge(eval(s,env));
- return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
- closure(superstep,env),
- init_state,
- o.equals(#<true>),
- ds);
- }
- throw new Error("Cannot perform the BSP plan: "+plan);
+ match plan {
+ case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
+ int[] os = new int[ns.length()];
+ for ( int i = 0; i < os.length; i++ )
+ os[i] = (int)((LongLeaf)ns.nth(i)).value();
+ DataSet ds = eval(S.head(),env);
+ for ( Tree s: S.tail() )
+ ds.merge(eval(s,env));
+ return BSPPlan.BSP(os,
+ closure(superstep,env),
+ init_state,
+ o.equals(#<true>),
+ ds);
+ case BSP(`n,`superstep,`init_state,`o,...S):
+ DataSet ds = eval(S.head(),env);
+ for ( Tree s: S.tail() )
+ ds.merge(eval(s,env));
+ return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
+ closure(superstep,env),
+ init_state,
+ o.equals(#<true>),
+ ds);
+ }
+ throw new Error("Cannot perform the BSP plan: "+plan);
}
/** The Aggregate physical operator
@@ -107,32 +107,32 @@ final public class Evaluator extends Interpreter {
* @return the aggregation result of type T
*/
final static MRData aggregate ( Tree acc_fnc,
- Tree zero,
- Tree plan,
- Environment env ) throws Exception {
- match plan {
- case BSP(`n,`superstep,`init_state,`o,...S):
- DataSet ds = eval(S.head(),env,"-");
- for ( Tree s: S.tail() )
- ds.merge(eval(s,env,"-"));
- return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
- closure(superstep,env),
- init_state,
- closure(acc_fnc,env),
- zero,
- ds);
- }
- throw new Error("Cannot perform the aggregation: "+plan);
+ Tree zero,
+ Tree plan,
+ Environment env ) throws Exception {
+ match plan {
+ case BSP(`n,`superstep,`init_state,`o,...S):
+ DataSet ds = eval(S.head(),env,"-");
+ for ( Tree s: S.tail() )
+ ds.merge(eval(s,env,"-"));
+ return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
+ closure(superstep,env),
+ init_state,
+ closure(acc_fnc,env),
+ zero,
+ ds);
+ }
+ throw new Error("Cannot perform the aggregation: "+plan);
}
final static Tuple loop ( Tree e, Environment env ) {
- throw new Error("The BSP Loop was supposed to be translated to a BSP task");
+ throw new Error("The BSP Loop was supposed to be translated to a BSP task");
}
final static DataSet eval ( final Tree e,
- final Environment env,
- final String counter ) {
- return eval(e,env);
+ final Environment env,
+ final String counter ) {
+ return eval(e,env);
}
/** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
@@ -142,20 +142,20 @@ final public class Evaluator extends Interpreter {
* @return a DataSet (stored in HDFS)
*/
final static DataSet eval ( final Tree e, final Environment env ) {
- if (Config.trace_execution) {
- tab_count += 3;
- System.out.println(tabs(tab_count)+print_query(e));
- };
- DataSet res = evalD(e,env);
- if (Config.trace_execution)
- try {
- System.out.println(tabs(tab_count)
- +"-> "+Plan.collect(res,false));
- tab_count -= 3;
- } catch (Exception ex) {
- throw new Error("Cannot collect the operator output: "+e);
- };
- return res;
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ DataSet res = evalD(e,env);
+ if (Config.trace_execution)
+ try {
+ System.out.println(tabs(tab_count)
+ +"-> "+Plan.collect(res,false));
+ tab_count -= 3;
+ } catch (Exception ex) {
+ throw new Error("Cannot collect the operator output: "+e);
+ };
+ return res;
}
/** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
@@ -164,61 +164,61 @@ final public class Evaluator extends Interpreter {
* @return a DataSet (stored in HDFS)
*/
final static DataSet evalD ( final Tree e, final Environment env ) {
- try {
- match e {
- case BSPSource(`n,BinarySource(`file,_)):
- return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
- case BSPSource(`n,ParsedSource(`parser,`file,...args)):
- Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- return Plan.parsedSource((int)((LongLeaf)n).value(),
- p,((MR_string)evalE(file,env)).get(),args);
- case BSPSource(`n,Generator(`min,`max,`size)):
- return Plan.generator((int)((LongLeaf)n).value(),
- ((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get(),
- ((MR_long)evalE(size,env)).get());
- case Merge(`x,`y):
- return Plan.merge(eval(x,env),eval(y,env));
- case Dump(`s):
- return Plan.fileCache((Bag)evalE(s,env));
- case apply(`f,`arg):
- if (!f.is_variable())
- return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
- MRData fnc = variable_lookup(f.toString(),global_env);
- if (fnc == null)
- throw new Error("Unknown function: "+f);
- MRData t = evalE(arg,env);
- if (!(t instanceof Tuple))
- throw new Error("Expected a tuple in function application: "+t);
- return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
- case `v:
- if (!v.is_variable())
- fail;
- MRData x = variable_lookup(v.toString(),env);
- if (x != null)
- if (x instanceof MR_dataset)
- return ((MR_dataset)x).dataset();
- x = variable_lookup(v.toString(),global_env);
- if (x != null)
- if (x instanceof MR_dataset)
- return ((MR_dataset)x).dataset();
- throw new Error("Variable "+v+" is not bound");
- };
- MRData d = bsp(e,env);
- if (d instanceof MR_dataset)
- return ((MR_dataset)d).dataset();
- throw new Error("Cannot evaluate the BSP plan: "+e);
- } catch (Error msg) {
- if (!Config.trace)
- throw new Error(msg.getMessage());
- System.err.println(msg.getMessage());
- throw new Error("Evaluation error in: "+print_query(e));
- } catch (Exception ex) {
- System.err.println(ex.getMessage());
- ex.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- }
+ try {
+ match e {
+ case BSPSource(`n,BinarySource(`file,_)):
+ return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
+ case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return Plan.parsedSource((int)((LongLeaf)n).value(),
+ p,((MR_string)evalE(file,env)).get(),args);
+ case BSPSource(`n,Generator(`min,`max,`size)):
+ return Plan.generator((int)((LongLeaf)n).value(),
+ ((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get(),
+ ((MR_long)evalE(size,env)).get());
+ case Merge(`x,`y):
+ return Plan.merge(eval(x,env),eval(y,env));
+ case Dump(`s):
+ return Plan.fileCache((Bag)evalE(s,env));
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
+ MRData fnc = variable_lookup(f.toString(),global_env);
+ if (fnc == null)
+ throw new Error("Unknown function: "+f);
+ MRData t = evalE(arg,env);
+ if (!(t instanceof Tuple))
+ throw new Error("Expected a tuple in function application: "+t);
+ return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ if (x instanceof MR_dataset)
+ return ((MR_dataset)x).dataset();
+ x = variable_lookup(v.toString(),global_env);
+ if (x != null)
+ if (x instanceof MR_dataset)
+ return ((MR_dataset)x).dataset();
+ throw new Error("Variable "+v+" is not bound");
+ };
+ MRData d = bsp(e,env);
+ if (d instanceof MR_dataset)
+ return ((MR_dataset)d).dataset();
+ throw new Error("Cannot evaluate the BSP plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/GeneratorInputFormat.java b/src/main/java/BSP/GeneratorInputFormat.java
index cb13a83..ac55964 100644
--- a/src/main/java/BSP/GeneratorInputFormat.java
+++ b/src/main/java/BSP/GeneratorInputFormat.java
@@ -29,61 +29,61 @@ import org.apache.hama.bsp.*;
* an (offset,size) pair that generates the range of values [offset,offset+size] */
final public class GeneratorInputFormat extends MRQLFileInputFormat {
public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
- final long offset;
- final long size;
- final int source_number;
- final MRData source_num_data;
- long index;
- SequenceFile.Reader reader;
+ final long offset;
+ final long size;
+ final int source_number;
+ final MRData source_num_data;
+ long index;
+ SequenceFile.Reader reader;
- public GeneratorRecordReader ( FileSplit split,
- int source_number,
- BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- reader.next(key,value);
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- this.source_number = source_number;
- source_num_data = new MR_int(source_number);
- index = -1;
- }
+ public GeneratorRecordReader ( FileSplit split,
+ int source_number,
+ BSPJob job ) throws IOException {
+ Configuration conf = BSPPlan.getConfiguration(job);
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+ MRContainer key = new MRContainer();
+ MRContainer value = new MRContainer();
+ reader.next(key,value);
+ offset = ((MR_long)((Tuple)(value.data())).first()).get();
+ size = ((MR_long)((Tuple)(value.data())).second()).get();
+ this.source_number = source_number;
+ source_num_data = new MR_int(source_number);
+ index = -1;
+ }
- public MRContainer createKey () {
- return new MRContainer(null);
- }
+ public MRContainer createKey () {
+ return new MRContainer(null);
+ }
- public MRContainer createValue () {
- return new MRContainer(null);
- }
+ public MRContainer createValue () {
+ return new MRContainer(null);
+ }
- public boolean next ( MRContainer key, MRContainer value ) throws IOException {
- index++;
- value.set(new Tuple(source_num_data,new MR_long(offset+index)));
- key.set(new MR_long(index));
- return index < size;
- }
+ public boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ index++;
+ value.set(new Tuple(source_num_data,new MR_long(offset+index)));
+ key.set(new MR_long(index));
+ return index < size;
+ }
- public long getPos () throws IOException { return index; }
+ public long getPos () throws IOException { return index; }
- public void close () throws IOException { reader.close(); }
+ public void close () throws IOException { reader.close(); }
- public float getProgress () throws IOException {
- return index / (float)size;
- }
+ public float getProgress () throws IOException {
+ return index / (float)size;
+ }
- public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
+ public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
}
public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- String path = ((FileSplit)split).getPath().toString();
- GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
- return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
+ getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
+ Configuration conf = BSPPlan.getConfiguration(job);
+ String path = ((FileSplit)split).getPath().toString();
+ GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
+ return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/MRQLFileInputFormat.java b/src/main/java/BSP/MRQLFileInputFormat.java
index c7a7abf..73c1fbb 100644
--- a/src/main/java/BSP/MRQLFileInputFormat.java
+++ b/src/main/java/BSP/MRQLFileInputFormat.java
@@ -31,38 +31,38 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
/** record reader for map-reduce */
abstract public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
+ getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
/** materialize the input file into a memory Bag */
public Bag materialize ( final Path file ) throws Exception {
- final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
- job.setInputPath(file);
- final InputSplit[] splits = getSplits(job,1);
- final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
- return new Bag(new BagIterator () {
- RecordReader<MRContainer,MRContainer> reader = rd;
- MRContainer key = reader.createKey();
- MRContainer value = reader.createKey();
- int i = 0;
- public boolean hasNext () {
- try {
- if (reader.next(key,value))
- return true;
- do {
- if (++i >= splits.length)
- return false;
- reader.close();
- reader = getRecordReader(splits[i],job);
- } while (!reader.next(key,value));
- return true;
- } catch (IOException e) {
- throw new Error("Cannot collect values from an intermediate result");
- }
- }
- public MRData next () {
- return value.data();
- }
- });
+ final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
+ job.setInputPath(file);
+ final InputSplit[] splits = getSplits(job,1);
+ final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
+ return new Bag(new BagIterator () {
+ RecordReader<MRContainer,MRContainer> reader = rd;
+ MRContainer key = reader.createKey();
+ MRContainer value = reader.createKey();
+ int i = 0;
+ public boolean hasNext () {
+ try {
+ if (reader.next(key,value))
+ return true;
+ do {
+ if (++i >= splits.length)
+ return false;
+ reader.close();
+ reader = getRecordReader(splits[i],job);
+ } while (!reader.next(key,value));
+ return true;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from an intermediate result");
+ }
+ }
+ public MRData next () {
+ return value.data();
+ }
+ });
}
/** materialize the entire dataset into a Bag
@@ -71,37 +71,37 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
* @return the Bag that contains the collected values
*/
public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
- Bag res = new Bag();
- for ( DataSource s: x.source )
- if (s.to_be_merged)
- res = res.union(Plan.merge(s));
- else {
- Path path = new Path(s.path);
- final FileSystem fs = path.getFileSystem(Plan.conf);
- final FileStatus[] ds
- = fs.listStatus(path,
- new PathFilter () {
- public boolean accept ( Path path ) {
- return !path.getName().startsWith("_");
- }
- });
- Bag b = new Bag();
- for ( FileStatus st: ds )
- b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
- if (strip) {
- // remove source_num
- final Iterator<MRData> iter = b.iterator();
- b = new Bag(new BagIterator() {
- public boolean hasNext () {
- return iter.hasNext();
- }
- public MRData next () {
- return ((Tuple)iter.next()).get(1);
- }
- });
- };
- res = res.union(b);
- };
- return res;
+ Bag res = new Bag();
+ for ( DataSource s: x.source )
+ if (s.to_be_merged)
+ res = res.union(Plan.merge(s));
+ else {
+ Path path = new Path(s.path);
+ final FileSystem fs = path.getFileSystem(Plan.conf);
+ final FileStatus[] ds
+ = fs.listStatus(path,
+ new PathFilter () {
+ public boolean accept ( Path path ) {
+ return !path.getName().startsWith("_");
+ }
+ });
+ Bag b = new Bag();
+ for ( FileStatus st: ds )
+ b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
+ if (strip) {
+ // remove source_num
+ final Iterator<MRData> iter = b.iterator();
+ b = new Bag(new BagIterator() {
+ public boolean hasNext () {
+ return iter.hasNext();
+ }
+ public MRData next () {
+ return ((Tuple)iter.next()).get(1);
+ }
+ });
+ };
+ res = res.union(b);
+ };
+ return res;
}
}