You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/01/04 01:01:03 UTC
[1/4] incubator-systemml git commit: Additional improvements naive
bayes script (w/o append, cleanups)
Repository: incubator-systemml
Updated Branches:
refs/heads/master 8e7b6ed3d -> ff2aea542
Additional improvements naive bayes script (w/o append, cleanups)
Results performance testsuite old/new naive bayes scripts (including
invocation overhead):
a) Hybrid Spark (20GB driver)
10k x 1k, dense: 2s -> 2s
10k x 1k, sparse: 1s -> 1s
100k x 1k, dense: 4s -> 4s
100k x 1k, sparse: 2s -> 2s
1M x 1k, dense: 42s -> 17s
1M x 1k, sparse: 4s -> 4s
10M x 1k, dense: 81s -> 78s
10M x 1k, sparse: 40s -> 27s
b) Hybrid MapReduce (2GB client)
10k x 1k, dense: 3s -> 3s
10k x 1k, sparse: 1s -> 1s
100k x 1k, dense: 23s -> 4s
100k x 1k, sparse: 3s -> 2s
1M x 1k, dense: 84s -> 81s
1M x 1k, sparse: 9s -> 6s
10M x 1k, dense: 139s -> 133s
10M x 1k, sparse: 80s -> 71s
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/f514f821
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/f514f821
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/f514f821
Branch: refs/heads/master
Commit: f514f821ceddc5c6c41fa9a67abf60f16314afef
Parents: 8e7b6ed
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 14:16:42 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 14:16:42 2016 -0800
----------------------------------------------------------------------
scripts/algorithms/naive-bayes.dml | 59 ++++++++------------
.../applications/naive-bayes/naive-bayes.dml | 25 +++------
.../applications/naive-bayes/naive-bayes.pydml | 31 +++++-----
3 files changed, 46 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/scripts/algorithms/naive-bayes.dml
----------------------------------------------------------------------
diff --git a/scripts/algorithms/naive-bayes.dml b/scripts/algorithms/naive-bayes.dml
index f2ad2ed..8284453 100644
--- a/scripts/algorithms/naive-bayes.dml
+++ b/scripts/algorithms/naive-bayes.dml
@@ -26,70 +26,59 @@
#
# defaults
-cmdLine_laplace = ifdef($laplace, 1)
-cmdLine_fmt = ifdef($fmt, "text")
+laplaceCorrection = ifdef($laplace, 1)
+fmt = ifdef($fmt, "text")
# reading input args
D = read($X)
-min_feature_val = min(D)
-if(min_feature_val < 0)
- stop("Stopping due to invalid argument: Multinomial naive Bayes is meant for count-based feature values, minimum value in X is negative")
+C = read($Y)
numRows = nrow(D)
+numFeatures = ncol(D)
+minFeatureVal = min(D)
+numClasses = max(C)
+minLabelVal = min(C)
+
+# sanity checks of data and arguments
+if(minFeatureVal < 0)
+ stop("Stopping due to invalid argument: Multinomial naive Bayes is meant for count-based feature values, minimum value in X is negative")
if(numRows < 2)
stop("Stopping due to invalid inputs: Not possible to learn a classifier without at least 2 rows")
-
-C = read($Y)
-if(min(C) < 1)
+if(minLabelVal < 1)
stop("Stopping due to invalid argument: Label vector (Y) must be recoded")
-numClasses = max(C)
if(numClasses == 1)
stop("Stopping due to invalid argument: Maximum label value is 1, need more than one class to learn a multi-class classifier")
-mod1 = C %% 1
-mod1_should_be_nrow = sum(abs(ppred(mod1, 0, "==")))
-if(mod1_should_be_nrow != numRows)
+if(sum(abs(C%%1 == 0)) != numRows)
stop("Stopping due to invalid argument: Please ensure that Y contains (positive) integral labels")
-
-laplace_correction = cmdLine_laplace
-if(laplace_correction < 0)
+if(laplaceCorrection < 0)
stop("Stopping due to invalid argument: Laplacian correction (laplace) must be non-negative")
-numFeatures = ncol(D)
-
# Compute conditionals
-
# Compute the feature counts for each class
classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=as.integer(numClasses))
# Compute the total feature count for each class
# and add the number of features to this sum
# for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
# Compute class conditional probabilities
-#ones = matrix(1, rows=1, cols=numFeatures)
-#repClassSums = classSums %*% ones
-#class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
-class_conditionals = (classFeatureCounts + laplace_correction) / classSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
# Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
-class_prior = class_counts / numRows;
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
+classPrior = classCounts / numRows;
# Compute accuracy on training set
-ones = matrix(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_probs = D_w_ones %*% t(log(model))
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+logProbs = D %*% t(log(classConditionals)) + t(log(classPrior));
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
acc_str = "Training Accuracy (%): " + acc
print(acc_str)
write(acc_str, $accuracy)
-extra_model_params = as.matrix(numFeatures)
-class_prior = t(append(t(class_prior), extra_model_params))
+extraModelParams = as.matrix(numFeatures)
+classPrior = rbind(classPrior, extraModelParams)
# write out the model
-write(class_prior, $prior, format=cmdLine_fmt);
-write(class_conditionals, $conditionals, format=cmdLine_fmt);
+write(classPrior, $prior, format=fmt);
+write(classConditionals, $conditionals, format=fmt);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/src/test/scripts/applications/naive-bayes/naive-bayes.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/naive-bayes/naive-bayes.dml b/src/test/scripts/applications/naive-bayes/naive-bayes.dml
index 9b1558c..1fe1bf4 100644
--- a/src/test/scripts/applications/naive-bayes/naive-bayes.dml
+++ b/src/test/scripts/applications/naive-bayes/naive-bayes.dml
@@ -33,42 +33,35 @@ fmt = ifdef($fmt, "text")
numClasses = $classes
D = read($X)
C = read($Y)
-laplace_correction = ifdef($laplace, 1)
+laplaceCorrection = ifdef($laplace, 1)
numRows = nrow(D)
numFeatures = ncol(D)
# Compute conditionals
-
# Compute the feature counts for each class
classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=as.integer(numClasses));
# Compute the total feature count for each class
# and add the number of features to this sum
# for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
# Compute class conditional probabilities
-ones = matrix(1, rows=1, cols=numFeatures)
-repClassSums = classSums %*% ones
-class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
# Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
-class_prior = class_counts / numRows;
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=as.integer(numClasses))
+classPrior = classCounts / numRows;
# Compute accuracy on training set
-ones = matrix(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_probs = D_w_ones %*% t(log(model))
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+logProbs = D %*% t(log(classConditionals)) + t(log(classPrior));
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
acc_str = "Training Accuracy (%): " + acc
print(acc_str)
write(acc_str, $accuracy)
# write out the model
-write(class_prior, $prior, format=fmt);
-write(class_conditionals, $conditionals, format=fmt);
+write(classPrior, $prior, format=fmt);
+write(classConditionals, $conditionals, format=fmt);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/f514f821/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/naive-bayes/naive-bayes.pydml b/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
index 073e1cb..25fbf84 100644
--- a/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
+++ b/src/test/scripts/applications/naive-bayes/naive-bayes.pydml
@@ -33,44 +33,39 @@ fmt = ifdef($fmt, "text")
numClasses = $classes
D = load($X)
C = load($Y)
-laplace_correction = ifdef($laplace, 1)
+laplaceCorrection = ifdef($laplace, 1)
numRows = nrow(D)
numFeatures = ncol(D)
# Compute conditionals
-
# Compute the feature counts for each class
classFeatureCounts = aggregate(target=D, groups=C, fn="sum", ngroups=numClasses);
# Compute the total feature count for each class
# and add the number of features to this sum
# for subsequent regularization (Laplace's rule)
-classSums = rowSums(classFeatureCounts) + numFeatures*laplace_correction
+classSums = rowSums(classFeatureCounts) + numFeatures*laplaceCorrection
# Compute class conditional probabilities
-ones = full(1, rows=1, cols=numFeatures)
-repClassSums = dot(classSums, ones)
-class_conditionals = (classFeatureCounts + laplace_correction) / repClassSums
+classConditionals = (classFeatureCounts + laplaceCorrection) / classSums
# Compute class priors
-class_counts = aggregate(target=C, groups=C, fn="count", ngroups=numClasses)
-class_prior = class_counts / numRows
+classCounts = aggregate(target=C, groups=C, fn="count", ngroups=numClasses)
+classPrior = classCounts / numRows
# Compute accuracy on training set
-ones = full(1, rows=numRows, cols=1)
-D_w_ones = cbind(D, ones)
-model = cbind(class_conditionals, class_prior)
-log_model = log(model)
-transpose_log_model = log_model.transpose()
-log_probs = dot(D_w_ones, transpose_log_model)
-pred = rowIndexMax(log_probs)
-acc = sum(ppred(pred, C, "==")) / numRows * 100
+lmodel1 = log(classConditionals)
+lmodel2 = log(classPrior)
+tlmodel1 = lmodel1.transpose()
+tlmodel2 = lmodel2.transpose()
+logProbs = dot(D, tlmodel1) + tlmodel2
+acc = sum(rowIndexMax(logProbs) == C) / numRows * 100
acc_str = "Training Accuracy (%): " + acc
print(acc_str)
save(acc_str, $accuracy)
# write out the model
-save(class_prior, $prior, format=fmt)
-save(class_conditionals, $conditionals, format=fmt)
+save(classPrior, $prior, format=fmt)
+save(classConditionals, $conditionals, format=fmt)
[4/4] incubator-systemml git commit: New spark map-grouped-aggregate
(compiler/runtime), for naive-bayes
Posted by mb...@apache.org.
New spark map-grouped-aggregate (compiler/runtime), for naive-bayes
Incl (1) refactoring for code reuse across spark/mapreduce, and (2)
additional cleanup instruction parsing (sp instruction parser).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ff2aea54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ff2aea54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ff2aea54
Branch: refs/heads/master
Commit: ff2aea54251948add5fd17c30a8c53536828d512
Parents: 3ea3cdb
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 18:22:44 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 18:22:44 2016 -0800
----------------------------------------------------------------------
.../sysml/hops/ParameterizedBuiltinOp.java | 22 +-
.../apache/sysml/lops/GroupedAggregateM.java | 35 +++-
.../instructions/SPInstructionParser.java | 39 ++--
.../mr/GroupedAggregateMInstruction.java | 33 +--
.../spark/AppendGAlignedSPInstruction.java | 3 +-
.../spark/AppendGSPInstruction.java | 3 +-
.../spark/AppendMSPInstruction.java | 3 +-
.../spark/AppendRSPInstruction.java | 3 +-
.../spark/BuiltinBinarySPInstruction.java | 3 +-
.../spark/BuiltinUnarySPInstruction.java | 3 +-
.../spark/MatrixReshapeSPInstruction.java | 3 +-
.../ParameterizedBuiltinSPInstruction.java | 203 +++++++++++++------
.../spark/QuantilePickSPInstruction.java | 3 +-
.../spark/QuantileSortSPInstruction.java | 3 +-
.../instructions/spark/RandSPInstruction.java | 3 +-
.../instructions/spark/WriteSPInstruction.java | 3 +-
.../matrix/data/OperationsOnMatrixValues.java | 41 ++++
17 files changed, 267 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 466d008..3a8445f 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -375,7 +375,7 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
}
else //CP/Spark
{
- GroupedAggregate grp_agg = null;
+ Lop grp_agg = null;
if( et == ExecType.CP)
{
@@ -391,9 +391,23 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop
OptimizerUtils.checkSparkBroadcastMemoryBudget( groups.getDim1(), groups.getDim2(),
groups.getRowsInBlock(), groups.getColsInBlock(), groups.getNnz()) );
- grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);
- grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
- setRequiresReblock( true );
+ if( broadcastGroups //mapgroupedagg
+ && getInput().get(_paramIndexMap.get(Statement.GAGG_FN)) instanceof LiteralOp
+ && ((LiteralOp)getInput().get(_paramIndexMap.get(Statement.GAGG_FN))).getStringValue().equals("sum")
+ && inputlops.get(Statement.GAGG_NUM_GROUPS) != null )
+ {
+ Hop target = getInput().get(_paramIndexMap.get(Statement.GAGG_TARGET));
+
+ grp_agg = new GroupedAggregateM(inputlops, getDataType(), getValueType(), true, ExecType.SPARK);
+ grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, target.getRowsInBlock(), target.getColsInBlock(), -1);
+ //no reblock required (directly output binary block)
+ }
+ else //groupedagg (w/ or w/o broadcast)
+ {
+ grp_agg = new GroupedAggregate(inputlops, getDataType(), getValueType(), et, broadcastGroups);
+ grp_agg.getOutputParameters().setDimensions(outputDim1, outputDim2, -1, -1, -1);
+ setRequiresReblock( true );
+ }
}
setLineNumbers(grp_agg);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java b/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
index 5edba62..2d73abc 100644
--- a/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
+++ b/src/main/java/org/apache/sysml/lops/GroupedAggregateM.java
@@ -68,13 +68,25 @@ public class GroupedAggregateM extends Lop
addInput(inputParameterLops.get(Statement.GAGG_GROUPS));
inputParameterLops.get(Statement.GAGG_GROUPS).addOutput(this);
- //setup MR parameters
- boolean breaksAlignment = true;
- boolean aligner = false;
- boolean definesMRJob = false;
- lps.addCompatibility(JobType.GMR);
- lps.addCompatibility(JobType.DATAGEN);
- lps.setProperties( inputs, ExecType.MR, ExecLocation.Map, breaksAlignment, aligner, definesMRJob );
+ if( et == ExecType.MR )
+ {
+ //setup MR parameters
+ boolean breaksAlignment = true;
+ boolean aligner = false;
+ boolean definesMRJob = false;
+ lps.addCompatibility(JobType.GMR);
+ lps.addCompatibility(JobType.DATAGEN);
+ lps.setProperties( inputs, ExecType.MR, ExecLocation.Map, breaksAlignment, aligner, definesMRJob );
+ }
+ else //SPARK
+ {
+ //setup Spark parameters
+ boolean breaksAlignment = false;
+ boolean aligner = false;
+ boolean definesMRJob = false;
+ lps.addCompatibility(JobType.INVALID);
+ lps.setProperties( inputs, et, ExecLocation.ControlProgram, breaksAlignment, aligner, definesMRJob );
+ }
}
@Override
@@ -85,6 +97,15 @@ public class GroupedAggregateM extends Lop
@Override
public String getInstructions(int input1, int input2, int output)
{
+ return getInstructions(
+ String.valueOf(input1),
+ String.valueOf(input2),
+ String.valueOf(output) );
+ }
+
+ @Override
+ public String getInstructions(String input1, String input2, String output)
+ {
StringBuilder sb = new StringBuilder();
sb.append( getExecType() );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
index 1f5f961..2683c43 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/SPInstructionParser.java
@@ -201,11 +201,12 @@ public class SPInstructionParser extends InstructionParser
String2SPInstructionType.put( "sel+", SPINSTRUCTION_TYPE.BuiltinUnary);
// Parameterized Builtin Functions
- String2SPInstructionType.put( "groupedagg" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
- String2SPInstructionType.put( "rmempty" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
- String2SPInstructionType.put( "replace" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
- String2SPInstructionType.put( "rexpand" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
- String2SPInstructionType.put( "transform" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "groupedagg" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "mapgroupedagg", SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "rmempty" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "replace" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "rexpand" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
+ String2SPInstructionType.put( "transform" , SPINSTRUCTION_TYPE.ParameterizedBuiltin);
String2SPInstructionType.put( "mappend", SPINSTRUCTION_TYPE.MAppend);
String2SPInstructionType.put( "rappend", SPINSTRUCTION_TYPE.RAppend);
@@ -338,10 +339,10 @@ public class SPInstructionParser extends InstructionParser
if ( parts[0].equals("log") || parts[0].equals("log_nz") ) {
if ( parts.length == 3 ) {
// B=log(A), y=log(x)
- return (SPInstruction) BuiltinUnarySPInstruction.parseInstruction(str);
+ return BuiltinUnarySPInstruction.parseInstruction(str);
} else if ( parts.length == 4 ) {
// B=log(A,10), y=log(x,10)
- return (SPInstruction) BuiltinBinarySPInstruction.parseInstruction(str);
+ return BuiltinBinarySPInstruction.parseInstruction(str);
}
}
else {
@@ -349,40 +350,40 @@ public class SPInstructionParser extends InstructionParser
}
case BuiltinBinary:
- return (SPInstruction) BuiltinBinarySPInstruction.parseInstruction(str);
+ return BuiltinBinarySPInstruction.parseInstruction(str);
case BuiltinUnary:
- return (SPInstruction) BuiltinUnarySPInstruction.parseInstruction(str);
+ return BuiltinUnarySPInstruction.parseInstruction(str);
case ParameterizedBuiltin:
- return (SPInstruction) ParameterizedBuiltinSPInstruction.parseInstruction(str);
+ return ParameterizedBuiltinSPInstruction.parseInstruction(str);
case MatrixReshape:
- return (SPInstruction) MatrixReshapeSPInstruction.parseInstruction(str);
+ return MatrixReshapeSPInstruction.parseInstruction(str);
case MAppend:
- return (SPInstruction) AppendMSPInstruction.parseInstruction(str);
+ return AppendMSPInstruction.parseInstruction(str);
case GAppend:
- return (SPInstruction) AppendGSPInstruction.parseInstruction(str);
+ return AppendGSPInstruction.parseInstruction(str);
case GAlignedAppend:
- return (SPInstruction) AppendGAlignedSPInstruction.parseInstruction(str);
+ return AppendGAlignedSPInstruction.parseInstruction(str);
case RAppend:
- return (SPInstruction) AppendRSPInstruction.parseInstruction(str);
+ return AppendRSPInstruction.parseInstruction(str);
case Rand:
- return (SPInstruction) RandSPInstruction.parseInstruction(str);
+ return RandSPInstruction.parseInstruction(str);
case QSort:
- return (SPInstruction) QuantileSortSPInstruction.parseInstruction(str);
+ return QuantileSortSPInstruction.parseInstruction(str);
case QPick:
- return (SPInstruction) QuantilePickSPInstruction.parseInstruction(str);
+ return QuantilePickSPInstruction.parseInstruction(str);
case Write:
- return (SPInstruction) WriteSPInstruction.parseInstruction(str);
+ return WriteSPInstruction.parseInstruction(str);
case CumsumAggregate:
return CumulativeAggregateSPInstruction.parseInstruction(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
index 4130fd9..34f0945 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/mr/GroupedAggregateMInstruction.java
@@ -30,6 +30,7 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysml.runtime.matrix.mapred.CachedValueMap;
import org.apache.sysml.runtime.matrix.mapred.DistributedCacheInput;
import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
@@ -90,37 +91,19 @@ public class GroupedAggregateMInstruction extends BinaryMRInstructionBase implem
//get all inputs
MatrixIndexes ix = in1.getIndexes();
- MatrixBlock target = (MatrixBlock)in1.getValue();
MatrixBlock groups = (MatrixBlock)dcInput.getDataBlock((int)ix.getRowIndex(), 1).getValue();
- //execute grouped aggregate operations
- MatrixBlock out = groups.groupedAggOperations(target, null, new MatrixBlock(), _ngroups, getOperator());
-
//output blocked result
int brlen = dcInput.getNumRowsPerBlock();
int bclen = dcInput.getNumColsPerBlock();
- if( out.getNumRows()<=brlen && out.getNumColumns()<=bclen )
- {
- //single output block
- cachedValues.add(output, new IndexedMatrixValue(new MatrixIndexes(1,ix.getColumnIndex()), out));
- }
- else
- {
- //multiple output blocks (by op def, single column block )
- for(int blockRow = 0; blockRow < (int)Math.ceil(out.getNumRows()/(double)brlen); blockRow++)
- {
- int maxRow = (blockRow*brlen + brlen < out.getNumRows()) ? brlen : out.getNumRows() - blockRow*brlen;
- int row_offset = blockRow*brlen;
-
- //copy submatrix to block
- MatrixBlock tmp = out.sliceOperations( row_offset, row_offset+maxRow-1,
- 0, out.getNumColumns()-1, new MatrixBlock() );
-
- //append block to result cache
- cachedValues.add(output, new IndexedMatrixValue(
- new MatrixIndexes(blockRow+1,ix.getColumnIndex()), tmp));
- }
+ //execute map grouped aggregate operations
+ ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>();
+ OperationsOnMatrixValues.performMapGroupedAggregate(getOperator(), in1, groups, _ngroups, brlen, bclen, outlist);
+
+ //output all result blocks
+ for( IndexedMatrixValue out : outlist ) {
+ cachedValues.add(output, out);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
index 5a930da..f1ddcdf 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGAlignedSPInstruction.java
@@ -29,7 +29,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -49,7 +48,7 @@ public class AppendGAlignedSPInstruction extends BinarySPInstruction
_cbind = cbind;
}
- public static Instruction parseInstruction ( String str )
+ public static AppendGAlignedSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
index ecf2bc3..30ca4f7 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendGSPInstruction.java
@@ -33,7 +33,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
@@ -54,7 +53,7 @@ public class AppendGSPInstruction extends BinarySPInstruction
_cbind = cbind;
}
- public static Instruction parseInstruction ( String str )
+ public static AppendGSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
index a1a0bf3..245f234 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendMSPInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.data.LazyIterableIterator;
@@ -59,7 +58,7 @@ public class AppendMSPInstruction extends BinarySPInstruction
_cbind = cbind;
}
- public static Instruction parseInstruction ( String str )
+ public static AppendMSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
index 7beba7d..e9e6e35 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/AppendRSPInstruction.java
@@ -29,7 +29,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.functionobjects.OffsetColumnIndex;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -48,7 +47,7 @@ public class AppendRSPInstruction extends BinarySPInstruction
_cbind = cbind;
}
- public static Instruction parseInstruction ( String str )
+ public static AppendRSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
index da85a89..01657ec 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinBinarySPInstruction.java
@@ -27,7 +27,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
@@ -50,7 +49,7 @@ public abstract class BuiltinBinarySPInstruction extends BinarySPInstruction
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public static Instruction parseInstruction ( String str )
+ public static BuiltinBinarySPInstruction parseInstruction ( String str )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
CPOperand in1 = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
index c9d5ab6..0bd272c 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/BuiltinUnarySPInstruction.java
@@ -25,7 +25,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.matrix.operators.Operator;
import org.apache.sysml.runtime.matrix.operators.UnaryOperator;
@@ -47,7 +46,7 @@ public abstract class BuiltinUnarySPInstruction extends UnarySPInstruction
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
- public static Instruction parseInstruction ( String str )
+ public static BuiltinUnarySPInstruction parseInstruction ( String str )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
CPOperand in = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
index 5d30c94..c2d180e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/MatrixReshapeSPInstruction.java
@@ -31,7 +31,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
@@ -70,7 +69,7 @@ public class MatrixReshapeSPInstruction extends UnarySPInstruction
* @return
* @throws DMLRuntimeException
*/
- public static Instruction parseInstruction ( String str )
+ public static MatrixReshapeSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index 505e232..f8e2669 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -29,6 +29,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import org.apache.sysml.lops.Lop;
+import org.apache.sysml.lops.PartialAggregate.CorrectionLocationType;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.parser.ParameterizedBuiltinFunctionExpression;
import org.apache.sysml.parser.Statement;
@@ -37,9 +38,9 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
import org.apache.sysml.runtime.functionobjects.ParameterizedBuiltin;
import org.apache.sysml.runtime.functionobjects.ValueFunction;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.mr.GroupedAggregateInstruction;
@@ -57,6 +58,7 @@ import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysml.runtime.matrix.data.WeightedCell;
import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
import org.apache.sysml.runtime.matrix.operators.AggregateOperator;
@@ -68,10 +70,10 @@ import org.apache.sysml.runtime.transform.DataTransform;
import org.apache.sysml.runtime.util.UtilFunctions;
public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
-{
-
- private int arity;
+{
protected HashMap<String,String> params;
+
+ //removeEmpty-specific attributes
private boolean _bRmEmptyBC = false;
public ParameterizedBuiltinSPInstruction(Operator op, HashMap<String,String> paramsMap, CPOperand out, String opcode, String istr, boolean bRmEmptyBC )
@@ -82,10 +84,6 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
_bRmEmptyBC = bRmEmptyBC;
}
- public int getArity() {
- return arity;
- }
-
public HashMap<String,String> getParams() { return params; }
public static HashMap<String, String> constructParameterMap(String[] params) {
@@ -102,70 +100,89 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
return paramMap;
}
- public static Instruction parseInstruction ( String str )
+ public static ParameterizedBuiltinSPInstruction parseInstruction ( String str )
throws DMLRuntimeException, DMLUnsupportedOperationException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
// first part is always the opcode
String opcode = parts[0];
- // last part is always the output
- CPOperand out = new CPOperand( parts[parts.length-1] );
- // process remaining parts and build a hash map
- HashMap<String,String> paramsMap = constructParameterMap(parts);
+ if( opcode.equalsIgnoreCase("mapgroupedagg") )
+ {
+ CPOperand target = new CPOperand( parts[1] );
+ CPOperand groups = new CPOperand( parts[2] );
+ CPOperand out = new CPOperand( parts[3] );
- // determine the appropriate value function
- ValueFunction func = null;
- if ( opcode.equalsIgnoreCase("groupedagg")) {
- // check for mandatory arguments
- String fnStr = paramsMap.get("fn");
- if ( fnStr == null )
- throw new DMLRuntimeException("Function parameter is missing in groupedAggregate.");
- if ( fnStr.equalsIgnoreCase("centralmoment") ) {
- if ( paramsMap.get("order") == null )
- throw new DMLRuntimeException("Mandatory \"order\" must be specified when fn=\"centralmoment\" in groupedAggregate.");
- }
+ HashMap<String,String> paramsMap = new HashMap<String, String>();
+ paramsMap.put(Statement.GAGG_TARGET, target.getName());
+ paramsMap.put(Statement.GAGG_GROUPS, groups.getName());
+ paramsMap.put(Statement.GAGG_NUM_GROUPS, parts[4]);
- Operator op = GroupedAggregateInstruction.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
- return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
- }
- else if( opcode.equalsIgnoreCase("rmempty") )
- {
- boolean bRmEmptyBC = false;
- if(parts.length > 6)
- bRmEmptyBC = (parts[5].compareTo("true") == 0)?true:false;
-
- func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, bRmEmptyBC);
- }
- else if( opcode.equalsIgnoreCase("rexpand") )
- {
- func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
- }
- else if( opcode.equalsIgnoreCase("replace") )
- {
- func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+ Operator op = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), true, CorrectionLocationType.LASTCOLUMN);
+
+ return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
}
- else if ( opcode.equalsIgnoreCase("transform") )
+ else
{
- func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- String specFile = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC);
- String applyTxPath = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
- if ( specFile != null && applyTxPath != null)
- throw new DMLRuntimeException(
- "Invalid parameters to transform(). Only one of '"
- + ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC
- + "' or '"
- + ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD
- + "' can be specified.");
- return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
- }
- else {
- throw new DMLRuntimeException("Unknown opcode (" + opcode + ") for ParameterizedBuiltin Instruction.");
- }
+ // last part is always the output
+ CPOperand out = new CPOperand( parts[parts.length-1] );
+ // process remaining parts and build a hash map
+ HashMap<String,String> paramsMap = constructParameterMap(parts);
+
+ // determine the appropriate value function
+ ValueFunction func = null;
+
+ if ( opcode.equalsIgnoreCase("groupedagg")) {
+ // check for mandatory arguments
+ String fnStr = paramsMap.get("fn");
+ if ( fnStr == null )
+ throw new DMLRuntimeException("Function parameter is missing in groupedAggregate.");
+ if ( fnStr.equalsIgnoreCase("centralmoment") ) {
+ if ( paramsMap.get("order") == null )
+ throw new DMLRuntimeException("Mandatory \"order\" must be specified when fn=\"centralmoment\" in groupedAggregate.");
+ }
+
+ Operator op = GroupedAggregateInstruction.parseGroupedAggOperator(fnStr, paramsMap.get("order"));
+ return new ParameterizedBuiltinSPInstruction(op, paramsMap, out, opcode, str, false);
+ }
+ else if( opcode.equalsIgnoreCase("rmempty") )
+ {
+ boolean bRmEmptyBC = false;
+ if(parts.length > 6)
+ bRmEmptyBC = (parts[5].compareTo("true") == 0)?true:false;
+
+ func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+ return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, bRmEmptyBC);
+ }
+ else if( opcode.equalsIgnoreCase("rexpand") )
+ {
+ func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+ return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+ }
+ else if( opcode.equalsIgnoreCase("replace") )
+ {
+ func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+ return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+ }
+ else if ( opcode.equalsIgnoreCase("transform") )
+ {
+ func = ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
+ String specFile = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC);
+ String applyTxPath = paramsMap.get(ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD);
+ if ( specFile != null && applyTxPath != null)
+ throw new DMLRuntimeException(
+ "Invalid parameters to transform(). Only one of '"
+ + ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_TXSPEC
+ + "' or '"
+ + ParameterizedBuiltinFunctionExpression.TF_FN_PARAM_APPLYMTD
+ + "' can be specified.");
+ return new ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, opcode, str, false);
+ }
+ else {
+ throw new DMLRuntimeException("Unknown opcode (" + opcode + ") for ParameterizedBuiltin Instruction.");
+ }
+ }
}
@@ -177,7 +194,31 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
String opcode = getOpcode();
//opcode guaranteed to be a valid opcode (see parsing)
- if ( opcode.equalsIgnoreCase("groupedagg") )
+ if( opcode.equalsIgnoreCase("mapgroupedagg") )
+ {
+ //get input rdd handle
+ String targetVar = params.get(Statement.GAGG_TARGET);
+ String groupsVar = params.get(Statement.GAGG_GROUPS);
+ JavaPairRDD<MatrixIndexes,MatrixBlock> target = sec.getBinaryBlockRDDHandleForVariable(targetVar);
+ PartitionedBroadcastMatrix groups = sec.getBroadcastForVariable(groupsVar);
+ MatrixCharacteristics mc1 = sec.getMatrixCharacteristics( targetVar );
+ MatrixCharacteristics mcOut = sec.getMatrixCharacteristics(output.getName());
+ CPOperand ngrpOp = new CPOperand(params.get(Statement.GAGG_NUM_GROUPS));
+ int ngroups = (int)sec.getScalarInput(ngrpOp.getName(), ngrpOp.getValueType(), ngrpOp.isLiteral()).getLongValue();
+
+ //execute map grouped aggregate
+ JavaPairRDD<MatrixIndexes, MatrixBlock> out =
+ target.flatMapToPair(new RDDMapGroupedAggFunction(groups, _optr,
+ ngroups, mc1.getRowsPerBlock(), mc1.getColsPerBlock()));
+ out = RDDAggregateUtils.sumByKeyStable(out);
+
+ //updated characteristics and handle outputs
+ mcOut.set(ngroups, mc1.getCols(), mc1.getRowsPerBlock(), mc1.getColsPerBlock(), -1);
+ sec.setRDDHandleForVariable(output.getName(), out);
+ sec.addLineageRDD( output.getName(), targetVar );
+ sec.addLineageBroadcast( output.getName(), groupsVar );
+ }
+ else if ( opcode.equalsIgnoreCase("groupedagg") )
{
boolean broadcastGroups = Boolean.parseBoolean(params.get("broadcast"));
@@ -519,6 +560,44 @@ public class ParameterizedBuiltinSPInstruction extends ComputationSPInstruction
}
}
+ public static class RDDMapGroupedAggFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes,MatrixBlock>,MatrixIndexes,MatrixBlock>
+ {
+ private static final long serialVersionUID = 6795402640178679851L;
+
+ private PartitionedBroadcastMatrix _pbm = null;
+ private Operator _op = null;
+ private int _ngroups = -1;
+ private int _brlen = -1;
+ private int _bclen = -1;
+
+ public RDDMapGroupedAggFunction(PartitionedBroadcastMatrix pbm, Operator op, int ngroups, int brlen, int bclen)
+ {
+ _pbm = pbm;
+ _op = op;
+ _ngroups = ngroups;
+ _brlen = brlen;
+ _bclen = bclen;
+ }
+
+ @Override
+ public Iterable<Tuple2<MatrixIndexes, MatrixBlock>> call(Tuple2<MatrixIndexes, MatrixBlock> arg0)
+ throws Exception
+ {
+ //get all inputs
+ MatrixIndexes ix = arg0._1();
+ MatrixBlock target = arg0._2();
+ MatrixBlock groups = _pbm.getMatrixBlock((int)ix.getRowIndex(), 1);
+
+ //execute map grouped aggregate operations
+ IndexedMatrixValue in1 = SparkUtils.toIndexedMatrixBlock(ix, target);
+ ArrayList<IndexedMatrixValue> outlist = new ArrayList<IndexedMatrixValue>();
+ OperationsOnMatrixValues.performMapGroupedAggregate(_op, in1, groups, _ngroups, _brlen, _bclen, outlist);
+
+ //output all result blocks
+ return SparkUtils.fromIndexedMatrixBlock(outlist);
+ }
+ }
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
index a4f7a83..5cea24f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantilePickSPInstruction.java
@@ -32,7 +32,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.cp.DoubleObject;
@@ -66,7 +65,7 @@ public class QuantilePickSPInstruction extends BinarySPInstruction
* @return
* @throws DMLRuntimeException
*/
- public static Instruction parseInstruction ( String str )
+ public static QuantilePickSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
index abf837b..793354f 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/QuantileSortSPInstruction.java
@@ -28,7 +28,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.utils.RDDSortUtils;
@@ -59,7 +58,7 @@ public class QuantileSortSPInstruction extends UnarySPInstruction
_sptype = SPINSTRUCTION_TYPE.QSort;
}
- public static Instruction parseInstruction ( String str )
+ public static QuantileSortSPInstruction parseInstruction ( String str )
throws DMLRuntimeException {
CPOperand in1 = new CPOperand("", ValueType.UNKNOWN, DataType.UNKNOWN);
CPOperand in2 = null;
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
index 3b4f798..b1eca1e 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/RandSPInstruction.java
@@ -45,7 +45,6 @@ import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtils;
@@ -197,7 +196,7 @@ public class RandSPInstruction extends UnarySPInstruction
* @return
* @throws DMLRuntimeException
*/
- public static Instruction parseInstruction(String str)
+ public static RandSPInstruction parseInstruction(String str)
throws DMLRuntimeException
{
String[] s = InstructionUtils.getInstructionPartsWithValueType ( str );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
index 47cdfd1..3a6ad01 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/WriteSPInstruction.java
@@ -34,7 +34,6 @@ import org.apache.sysml.runtime.DMLUnsupportedOperationException;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
-import org.apache.sysml.runtime.instructions.Instruction;
import org.apache.sysml.runtime.instructions.InstructionUtils;
import org.apache.sysml.runtime.instructions.cp.CPOperand;
import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
@@ -72,7 +71,7 @@ public class WriteSPInstruction extends SPInstruction
formatProperties = null; // set in case of csv
}
- public static Instruction parseInstruction ( String str )
+ public static WriteSPInstruction parseInstruction ( String str )
throws DMLRuntimeException
{
String[] parts = InstructionUtils.getInstructionPartsWithValueType ( str );
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ff2aea54/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 65e2c22..485b559 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -350,4 +350,45 @@ public class OperationsOnMatrixValues
//execute actual slice operation
in.getValue().sliceOperations(outlist, tmpRange, rowCut, colCut, brlen, bclen, boundaryRlen, boundaryClen);
}
+
+ /**
+ *
+ * @param target
+ * @param groups
+ * @param brlen
+ * @param bclen
+ * @param outlist
+ * @throws DMLRuntimeException
+ * @throws DMLUnsupportedOperationException
+ */
+ public static void performMapGroupedAggregate( Operator op, IndexedMatrixValue inTarget, MatrixBlock groups, int ngroups, int brlen, int bclen, ArrayList<IndexedMatrixValue> outlist ) throws DMLRuntimeException, DMLUnsupportedOperationException
+ {
+ MatrixIndexes ix = inTarget.getIndexes();
+ MatrixBlock target = (MatrixBlock)inTarget.getValue();
+
+ //execute grouped aggregate operations
+ MatrixBlock out = groups.groupedAggOperations(target, null, new MatrixBlock(), ngroups, op);
+
+ if( out.getNumRows()<=brlen && out.getNumColumns()<=bclen )
+ {
+ //single output block
+ outlist.add( new IndexedMatrixValue(new MatrixIndexes(1,ix.getColumnIndex()), out) );
+ }
+ else
+ {
+ //multiple output blocks (by op def, single column block )
+ for(int blockRow = 0; blockRow < (int)Math.ceil(out.getNumRows()/(double)brlen); blockRow++)
+ {
+ int maxRow = (blockRow*brlen + brlen < out.getNumRows()) ? brlen : out.getNumRows() - blockRow*brlen;
+ int row_offset = blockRow*brlen;
+
+ //copy submatrix to block
+ MatrixBlock tmp = out.sliceOperations( row_offset, row_offset+maxRow-1,
+ 0, out.getNumColumns()-1, new MatrixBlock() );
+
+ //append block to result cache
+ outlist.add(new IndexedMatrixValue(new MatrixIndexes(blockRow+1,ix.getColumnIndex()), tmp));
+ }
+ }
+ }
}
[2/4] incubator-systemml git commit: New static simplification
rewrite 'simplify unary-over-ppred operations'
Posted by mb...@apache.org.
New static simplification rewrite 'simplify unary-over-ppred operations'
Examples are abs(ppred()) -> ppred() and similarly ceil, floor, and
round over ppred because ppred guarantees 0/1 indicator outputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/ba08b4c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/ba08b4c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/ba08b4c1
Branch: refs/heads/master
Commit: ba08b4c1b25f0a66e24cc8bf5ebf0c5f7f71581a
Parents: f514f82
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:52:59 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:52:59 2016 -0800
----------------------------------------------------------------------
.../RewriteAlgebraicSimplificationStatic.java | 35 +++++++++++++++++++-
1 file changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/ba08b4c1/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
index adda6a2..5483bed 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/RewriteAlgebraicSimplificationStatic.java
@@ -138,7 +138,8 @@ public class RewriteAlgebraicSimplificationStatic extends HopRewriteRule
hi = simplifyDistributiveBinaryOperation(hop, hi, i);//e.g., (X-Y*X) -> (1-Y)*X
hi = simplifyBushyBinaryOperation(hop, hi, i); //e.g., (X*(Y*(Z%*%v))) -> (X*Y)*(Z%*%v)
hi = simplifyUnaryAggReorgOperation(hop, hi, i); //e.g., sum(t(X)) -> sum(X)
- hi = simplifyTransposedAppend(hop, hi, i); //e.g., t(cbind(t(A),t(B))) -> rbind(A,B);
+ hi = simplifyUnaryPPredOperation(hop, hi, i); //e.g., abs(ppred()) -> ppred(), others: round, ceil, floor
+ hi = simplifyTransposedAppend(hop, hi, i); //e.g., t(cbind(t(A),t(B))) -> rbind(A,B);
hi = fuseBinarySubDAGToUnaryOperation(hop, hi, i); //e.g., X*(1-X)-> sprop(X) || 1/(1+exp(-X)) -> sigmoid(X) || X*(X>0) -> selp(X)
hi = simplifyTraceMatrixMult(hop, hi, i); //e.g., trace(X%*%Y)->sum(X*t(Y));
hi = simplifySlicedMatrixMult(hop, hi, i); //e.g., (X%*%Y)[1,1] -> X[1,] %*% Y[,1];
@@ -744,6 +745,38 @@ public class RewriteAlgebraicSimplificationStatic extends HopRewriteRule
* @param pos
* @return
*/
+ private Hop simplifyUnaryPPredOperation( Hop parent, Hop hi, int pos )
+ {
+ if( hi instanceof UnaryOp && hi.getDataType()==DataType.MATRIX //unaryop
+ && hi.getInput().get(0) instanceof BinaryOp //binaryop - ppred
+ && ((BinaryOp)hi.getInput().get(0)).isPPredOperation() )
+ {
+ UnaryOp uop = (UnaryOp) hi; //valid unary op
+ if( uop.getOp()==OpOp1.ABS || uop.getOp()==OpOp1.CEIL
+ || uop.getOp()==OpOp1.FLOOR || uop.getOp()==OpOp1.ROUND )
+ {
+ //clear link unary-binary
+ Hop input = uop.getInput().get(0);
+ HopRewriteUtils.removeAllChildReferences(hi);
+
+ HopRewriteUtils.removeChildReferenceByPos(parent, hi, pos);
+ HopRewriteUtils.addChildReference(parent, input, pos);
+ hi = input;
+
+ LOG.debug("Applied simplifyUnaryPPredOperation.");
+ }
+ }
+
+ return hi;
+ }
+
+ /**
+ *
+ * @param parent
+ * @param hi
+ * @param pos
+ * @return
+ */
private Hop simplifyTransposedAppend( Hop parent, Hop hi, int pos )
{
//e.g., t(cbind(t(A),t(B))) --> rbind(A,B), t(rbind(t(A),t(B))) --> cbind(A,B)
[3/4] incubator-systemml git commit: Cleanup lop piggybacking (unused
code, visibility, logging, iterators)
Posted by mb...@apache.org.
Cleanup lop piggybacking (unused code, visibility, logging, iterators)
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/3ea3cdbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/3ea3cdbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/3ea3cdbd
Branch: refs/heads/master
Commit: 3ea3cdbd4cd83b0dc144f232a69b51cc25bc82a9
Parents: ba08b4c
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Sat Jan 2 16:54:33 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Sat Jan 2 16:54:33 2016 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/lops/compile/Dag.java | 872 ++++---------------
1 file changed, 149 insertions(+), 723 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/3ea3cdbd/src/main/java/org/apache/sysml/lops/compile/Dag.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/compile/Dag.java b/src/main/java/org/apache/sysml/lops/compile/Dag.java
index 3b4cfdb..f5693ef 100644
--- a/src/main/java/org/apache/sysml/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysml/lops/compile/Dag.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -92,7 +91,6 @@ import org.apache.sysml.runtime.matrix.sort.PickFromCompactInputFormat;
*/
public class Dag<N extends Lop>
{
-
private static final Log LOG = LogFactory.getLog(Dag.class.getName());
private static final int CHILD_BREAKS_ALIGNMENT = 2;
@@ -106,7 +104,7 @@ public class Dag<N extends Lop>
private int total_reducers = -1;
private String scratch = "";
- private String scratchFilePath = "";
+ private String scratchFilePath = null;
private double gmrMapperFootprint = 0;
@@ -185,7 +183,7 @@ public class Dag<N extends Lop>
}
private String getFilePath() {
- if ( scratchFilePath.equalsIgnoreCase("") ) {
+ if ( scratchFilePath == null ) {
scratchFilePath = scratch + Lop.FILE_SEPARATOR
+ Lop.PROCESS_PREFIX + DMLScript.getUUID()
+ Lop.FILE_SEPARATOR + Lop.FILE_SEPARATOR
@@ -209,6 +207,20 @@ public class Dag<N extends Lop>
}
/**
+ * Method to add a node to the DAG.
+ *
+ * @param node
+ * @return true if node was not already present, false if not.
+ */
+
+ public boolean addNode(N node) {
+ if (nodes.contains(node))
+ return false;
+ nodes.add(node);
+ return true;
+ }
+
+ /**
*
* @param config
* @return
@@ -262,83 +274,6 @@ public class Dag<N extends Lop>
}
- /**
- * Method to remove transient reads that do not have a transient write
- *
- * @param nodeV
- * @param inst
- * @throws DMLUnsupportedOperationException
- * @throws DMLRuntimeException
- */
- @SuppressWarnings("unused")
- private void deleteUnwantedTransientReadVariables(ArrayList<N> nodeV,
- ArrayList<Instruction> inst) throws DMLRuntimeException,
- DMLUnsupportedOperationException {
- HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
-
- LOG.trace("In delete unwanted variables");
-
- // first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.READ) {
- labelNodeMapping.put(node.getOutputParameters().getLabel(),
- node);
- }
- }
-
- // generate delete instructions for all transient read variables without
- // a transient write
- // first capture all transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
-
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE ) {
- if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data) {
- // this transient write is NOT a result of actual computation, but is simple copy.
- // in such a case, preserve the input variable so that appropriate copy instruction (cpvar or GMR) is generated
- // therefore, remove the input label from labelNodeMapping
- labelNodeMapping.remove(node.getInputs().get(0).getOutputParameters().getLabel());
- }
- else {
- if(labelNodeMapping.containsKey(node.getOutputParameters().getLabel()))
- // corresponding transient read exists (i.e., the variable is updated)
- // in such a case, generate rmvar instruction so that OLD data gets deleted
- labelNodeMapping.remove(node.getOutputParameters().getLabel());
- }
- }
- }
-
- // generate RM instructions
- Instruction rm_inst = null;
- for( Entry<String, N> e : labelNodeMapping.entrySet() ) {
- String label = e.getKey();
- N node = e.getValue();
-
- if (((Data) node).getDataType() == DataType.SCALAR) {
- // if(DEBUG)
- // System.out.println("rmvar" + Lops.OPERAND_DELIMITOR + label);
- // inst.add(new VariableSimpleInstructions("rmvar" +
- // Lops.OPERAND_DELIMITOR + label));
-
- } else {
- rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
- rm_inst.setLocation(node);
-
- if( LOG.isTraceEnabled() )
- LOG.trace(rm_inst.toString());
- inst.add(rm_inst);
-
- }
- }
-
- }
-
private void deleteUpdatedTransientReadVariables(StatementBlock sb, ArrayList<N> nodeV,
ArrayList<Instruction> inst) throws DMLRuntimeException,
DMLUnsupportedOperationException {
@@ -348,20 +283,6 @@ public class Dag<N extends Lop>
LOG.trace("In delete updated variables");
- /*
- Set<String> in = sb.liveIn().getVariables().keySet();
- Set<String> out = sb.liveOut().getVariables().keySet();
- Set<String> updated = sb.variablesUpdated().getVariables().keySet();
-
- Set<String> intersection = in;
- intersection.retainAll(out);
- intersection.retainAll(updated);
-
- for (String var : intersection) {
- inst.add(VariableCPInstruction.prepareRemoveInstruction(var));
- }
- */
-
// CANDIDATE list of variables which could have been updated in this statement block
HashMap<String, N> labelNodeMapping = new HashMap<String, N>();
@@ -371,8 +292,7 @@ public class Dag<N extends Lop>
HashMap<String, N> updatedLabelsLineNum = new HashMap<String, N>();
// first capture all transient read variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
+ for ( N node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -399,8 +319,7 @@ public class Dag<N extends Lop>
}
// capture updated transient write variables
- for (int i = 0; i < nodeV.size(); i++) {
- N node = nodeV.get(i);
+ for ( N node : nodeV ) {
if (node.getExecLocation() == ExecLocation.Data
&& ((Data) node).isTransient()
@@ -422,8 +341,8 @@ public class Dag<N extends Lop>
rm_inst = VariableCPInstruction.prepareRemoveInstruction(label);
rm_inst.setLocation(updatedLabelsLineNum.get(label));
-
- LOG.trace(rm_inst.toString());
+ if( LOG.isTraceEnabled() )
+ LOG.trace(rm_inst.toString());
inst.add(rm_inst);
}
@@ -479,23 +398,6 @@ public class Dag<N extends Lop>
}*/
}
- /**
- * Method to add a node to the DAG.
- *
- * @param node
- * @return true if node was not already present, false if not.
- */
-
- public boolean addNode(N node) {
- if (nodes.contains(node))
- return false;
- else {
- nodes.add(node);
- return true;
- }
-
- }
-
private ArrayList<ArrayList<N>> createNodeVectors(int size) {
ArrayList<ArrayList<N>> arr = new ArrayList<ArrayList<N>>();
@@ -508,8 +410,8 @@ public class Dag<N extends Lop>
}
private void clearNodeVectors(ArrayList<ArrayList<N>> arr) {
- for (int i = 0; i < arr.size(); i++) {
- arr.get(i).clear();
+ for (ArrayList<N> tmp : arr) {
+ tmp.clear();
}
}
@@ -519,9 +421,10 @@ public class Dag<N extends Lop>
int base = jt.getBase();
for (int i = from; i < to; i++) {
- if ((nodes.get(i).getCompatibleJobs() & base) == 0) {
+ N node = nodes.get(i);
+ if ((node.getCompatibleJobs() & base) == 0) {
if( LOG.isTraceEnabled() )
- LOG.trace("Not compatible "+ nodes.get(i).toString());
+ LOG.trace("Not compatible "+ node.toString());
return false;
}
}
@@ -665,7 +568,6 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws LopsException
*/
-
private void handleSingleOutputJobs(ArrayList<N> execNodes,
ArrayList<ArrayList<N>> jobNodes, ArrayList<N> finishedNodes)
throws LopsException {
@@ -685,9 +587,7 @@ public class Dag<N extends Lop>
if (!jobNodes.get(jindex).isEmpty()) {
ArrayList<N> vec = jobNodes.get(jindex);
- // first find all nodes with more than one parent that is not
- // finished.
-
+ // first find all nodes with more than one parent that is not finished.
for (int i = 0; i < vec.size(); i++) {
N node = vec.get(i);
if (node.getExecLocation() == ExecLocation.MapOrReduce
@@ -705,24 +605,10 @@ public class Dag<N extends Lop>
}
}
}
- /*
- // Following condition will not work for MMRJ because execNodes may contain non-MapAndReduce
- // lops that are compatible with MMRJ (e.g., Data-WRITE)
- else if (!(node.getExecLocation() == ExecLocation.MapAndReduce
- && node.getType() == lopTypes[jobi])) {
- throw new LopsException(
- "Error in handleSingleOutputJobs(): encountered incompatible execLocation='"
- + node.getExecLocation() + "' for lop (ID="
- + node.getID() + ").");
- }
- */
}
- // need to redo all nodes in nodesWithOutput as well as their
- // children
-
- for (int i = 0; i < vec.size(); i++) {
- N node = vec.get(i);
+ // need to redo all nodes in nodesWithOutput as well as their children
+ for ( N node : vec ) {
if (node.getExecLocation() == ExecLocation.MapOrReduce
|| node.getExecLocation() == ExecLocation.Map) {
if (nodesWithUnfinishedOutputs.contains(node))
@@ -798,14 +684,6 @@ public class Dag<N extends Lop>
}
}
- /*private String getOutputFormat(Data node) {
- if(node.getFileFormatType() == FileFormatTypes.TEXT)
- return "textcell";
- else if ( node.getOutputParameters().isBlocked_representation() )
- return "binaryblock";
- else
- return "binarycell";
- }*/
/**
* Determine whether to send <code>node</code> to MR or to process it in the control program.
@@ -922,7 +800,6 @@ public class Dag<N extends Lop>
* @throws DMLUnsupportedOperationException
* @throws DMLRuntimeException
*/
-
@SuppressWarnings("unchecked")
private ArrayList<Instruction> doGreedyGrouping(StatementBlock sb, ArrayList<N> node_v)
throws LopsException, IOException, DMLRuntimeException,
@@ -940,7 +817,6 @@ public class Dag<N extends Lop>
ArrayList<ArrayList<N>> jobNodes = createNodeVectors(JobType.getNumJobTypes());
-
// list of instructions
ArrayList<Instruction> inst = new ArrayList<Instruction>();
@@ -949,9 +825,6 @@ public class Dag<N extends Lop>
ArrayList<Instruction> deleteInst = new ArrayList<Instruction>();
ArrayList<Instruction> endOfBlockInst = new ArrayList<Instruction>();
- // delete transient variables that are no longer needed
- //deleteUnwantedTransientReadVariables(node_v, deleteInst);
-
// remove files for transient reads that are updated.
deleteUpdatedTransientReadVariables(sb, node_v, writeInst);
@@ -1004,9 +877,6 @@ public class Dag<N extends Lop>
if( LOG.isTraceEnabled() )
LOG.trace(indent + "Queueing node "
+ node.toString() + " (code 2)");
- //for(N q: queuedNodes) {
- // LOG.trace(indent + " " + q.getType() + "," + q.getID());
- //}
queuedNodes.add(node);
// if node has more than two inputs,
@@ -1043,22 +913,6 @@ public class Dag<N extends Lop>
}
}
- /*if (node.getInputs().size() == 2) {
- int j1 = jobType(node.getInputs().get(0), jobNodes);
- int j2 = jobType(node.getInputs().get(1), jobNodes);
- if (j1 != -1 && j2 != -1 && j1 != j2) {
- LOG.trace(indent + "Queueing node "
- + node.toString() + " (code 3)");
-
- queuedNodes.add(node);
-
- removeNodesForNextIteration(node, finishedNodes,
- execNodes, queuedNodes, jobNodes);
-
- continue;
- }
- }*/
-
// See if this lop can be eliminated
// This check is for "aligner" lops (e.g., group)
boolean eliminate = false;
@@ -1267,13 +1121,6 @@ public class Dag<N extends Lop>
// reduce node, make sure no parent needs reduce, else queue
if (node.getExecLocation() == ExecLocation.MapAndReduce) {
-
- // boolean eliminate = false;
- // eliminate = canEliminateLop(node, execNodes);
- // if (eliminate || (!hasChildNode(node, execNodes,
- // ExecLocation.MapAndReduce)) &&
- // !hasMRJobChildNode(node,execNodes)) {
-
// TODO: statiko -- keep the middle condition
// discuss about having a lop that is MapAndReduce but does
// not define a job
@@ -1282,14 +1129,7 @@ public class Dag<N extends Lop>
execNodes.add(node);
finishedNodes.add(node);
addNodeByJobType(node, jobNodes, execNodes, eliminate);
-
- // } else {
- // if (DEBUG)
- // System.out.println("Queueing -" + node.toString());
- // queuedNodes.add(node);
- // removeNodesForNextIteration(node, finishedNodes,
- // execNodes, queuedNodes, jobNodes);
- // }
+
continue;
}
@@ -1319,10 +1159,10 @@ public class Dag<N extends Lop>
// add Scalar to execNodes if it has no child in exec nodes
// that will be executed in a MR job.
if (node.getExecLocation() == ExecLocation.ControlProgram) {
- for (int j = 0; j < node.getInputs().size(); j++) {
- if (execNodes.contains(node.getInputs().get(j))
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.Data)
- && !(node.getInputs().get(j).getExecLocation() == ExecLocation.ControlProgram)) {
+ for ( Lop lop : node.getInputs() ) {
+ if (execNodes.contains(lop)
+ && !(lop.getExecLocation() == ExecLocation.Data)
+ && !(lop.getExecLocation() == ExecLocation.ControlProgram)) {
if( LOG.isTraceEnabled() )
LOG.trace(indent + "Queueing -"+ node.toString() + " (code 9)");
@@ -1396,11 +1236,8 @@ public class Dag<N extends Lop>
// next generate MR instructions
if (!execNodes.isEmpty())
generateMRJobs(execNodes, inst, writeInst, deleteInst, jobNodes);
-
handleSingleOutputJobs(execNodes, jobNodes, finishedNodes);
-
}
-
}
// add write and delete inst at the very end.
@@ -1415,9 +1252,7 @@ public class Dag<N extends Lop>
}
private boolean compatibleWithChildrenInExecNodes(ArrayList<N> execNodes, N node) {
- for(int i=0; i < execNodes.size(); i++)
- {
- N tmpNode = execNodes.get(i);
+ for( N tmpNode : execNodes ) {
// for lops that execute in control program, compatibleJobs property is set to LopProperties.INVALID
// we should not consider such lops in this check
if (isChild(tmpNode, node, IDMap)
@@ -1466,11 +1301,6 @@ public class Dag<N extends Lop>
else {
processConsumers((N)in, inst, delteInst, null);
}
-
- /*if ( in.removeConsumer() == 0 ) {
- String label = in.getOutputParameters().getLabel();
- inst.add(VariableCPInstruction.prepareRemoveInstruction(label));
- }*/
}
}
@@ -1506,7 +1336,6 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
-
@SuppressWarnings("unchecked")
private void generateControlProgramJobs(ArrayList<N> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeInst, ArrayList<Instruction> deleteInst) throws LopsException,
@@ -1568,8 +1397,6 @@ public class Dag<N extends Lop>
else {
var_deletions.add(node.getOutputParameters().getLabel());
var_deletionsLineNum.put(node.getOutputParameters().getLabel(), node);
-
- //System.out.println(" --> skipping " + out.getLastInstructions() + " while processing node " + node.getID());
}
}
@@ -1768,10 +1595,9 @@ public class Dag<N extends Lop>
}
// delete all marked nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- execNodes.remove(markedNodes.get(i));
+ for ( N node : markedNodes ) {
+ execNodes.remove(node);
}
-
}
/**
@@ -1784,7 +1610,6 @@ public class Dag<N extends Lop>
* @param queuedNodes
* @throws LopsException
*/
-
private void removeNodesForNextIteration(N node, ArrayList<N> finishedNodes,
ArrayList<N> execNodes, ArrayList<N> queuedNodes,
ArrayList<ArrayList<N>> jobvec) throws LopsException {
@@ -1794,10 +1619,9 @@ public class Dag<N extends Lop>
return;
//if all children are queued, then there is nothing to do.
- int numInputs = node.getInputs().size();
boolean allQueued = true;
- for(int i=0; i < numInputs; i++) {
- if( !queuedNodes.contains(node.getInputs().get(i)) ) {
+ for( Lop input : node.getInputs() ) {
+ if( !queuedNodes.contains(input) ) {
allQueued = false;
break;
}
@@ -1811,8 +1635,8 @@ public class Dag<N extends Lop>
// Determine if <code>node</code> has inputs from the same job or multiple jobs
int jobid = Integer.MIN_VALUE;
boolean inputs_in_same_job = true;
- for(int idx=0; idx < node.getInputs().size(); idx++) {
- int input_jobid = jobType(node.getInputs().get(idx), jobvec);
+ for( Lop input : node.getInputs() ) {
+ int input_jobid = jobType(input, jobvec);
if ( jobid == Integer.MIN_VALUE )
jobid = input_jobid;
else if ( jobid != input_jobid ) {
@@ -1824,8 +1648,7 @@ public class Dag<N extends Lop>
// Determine if there exist any unassigned inputs to <code>node</code>
// Evaluate only those lops that execute in MR.
boolean unassigned_inputs = false;
- for(int i=0; i < numInputs; i++) {
- Lop input = node.getInputs().get(i);
+ for( Lop input : node.getInputs() ) {
//if ( input.getExecLocation() != ExecLocation.ControlProgram && jobType(input, jobvec) == -1 ) {
if ( input.getExecType() == ExecType.MR && !execNodes.contains(input)) { //jobType(input, jobvec) == -1 ) {
unassigned_inputs = true;
@@ -1835,8 +1658,8 @@ public class Dag<N extends Lop>
// Determine if any node's children are queued
boolean child_queued = false;
- for(int i=0; i < numInputs; i++) {
- if (queuedNodes.contains(node.getInputs().get(i)) ) {
+ for( Lop input : node.getInputs() ) {
+ if (queuedNodes.contains(input) ) {
child_queued = true;
break;
}
@@ -1940,14 +1763,16 @@ public class Dag<N extends Lop>
} // for i
// we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- LOG.trace(" Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes) && !markedNodes.contains(execNodes.get(i))) {
- markedNodes.add(execNodes.get(i));
- LOG.trace(" Removing for next iteration (code 6) (" + execNodes.get(i).getID() + ") " + execNodes.get(i).toString());
+ for ( N enode : execNodes ) {
+ if( LOG.isTraceEnabled() ) {
+ LOG.trace(" Checking for removal - ("
+ + enode.getID() + ") " + enode.toString());
+ }
+
+ if (hasChildNode(enode, markedNodes) && !markedNodes.contains(enode)) {
+ markedNodes.add(enode);
+ if( LOG.isTraceEnabled() )
+ LOG.trace(" Removing for next iteration (code 6) (" + enode.getID() + ") " + enode.toString());
}
}
@@ -1963,193 +1788,6 @@ public class Dag<N extends Lop>
queuedNodes.add(n);
}
}
- /*for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.elementAt(i));
- execNodes.remove(markedNodes.elementAt(i));
- removeNodeByJobType(markedNodes.elementAt(i), jobvec);
- queuedNodes.add(markedNodes.elementAt(i));
- }*/
- }
-
- @SuppressWarnings("unused")
- private void removeNodesForNextIterationOLD(N node, ArrayList<N> finishedNodes,
- ArrayList<N> execNodes, ArrayList<N> queuedNodes,
- ArrayList<ArrayList<N>> jobvec) throws LopsException {
- // only queued nodes with two inputs need to be handled.
-
- // TODO: statiko -- this should be made == 1
- if (node.getInputs().size() != 2)
- return;
-
-
- //if both children are queued, nothing to do.
- if (queuedNodes.contains(node.getInputs().get(0))
- && queuedNodes.contains(node.getInputs().get(1)))
- return;
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Before remove nodes for next iteration -- size of execNodes " + execNodes.size());
-
-
-
- boolean inputs_in_same_job = false;
- // TODO: handle tertiary
- if (jobType(node.getInputs().get(0), jobvec) != -1
- && jobType(node.getInputs().get(0), jobvec) == jobType(node
- .getInputs().get(1), jobvec)) {
- inputs_in_same_job = true;
- }
-
- boolean unassigned_inputs = false;
- // here.. scalars shd be ignored
- if ((node.getInputs().get(0).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(0), jobvec) == -1)
- || (node.getInputs().get(1).getExecLocation() != ExecLocation.ControlProgram && jobType(
- node.getInputs().get(1), jobvec) == -1)) {
- unassigned_inputs = true;
- }
-
- boolean child_queued = false;
-
- // check if atleast one child was queued.
- if (queuedNodes.contains(node.getInputs().get(0))
- || queuedNodes.contains(node.getInputs().get(1)))
- child_queued = true;
-
- // nodes to be dropped
- ArrayList<N> markedNodes = new ArrayList<N>();
-
- for (int i = 0; i < execNodes.size(); i++) {
-
- N tmpNode = execNodes.get(i);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Checking for removal (" + tmpNode.getID()
- + ") " + tmpNode.toString());
- LOG.trace("Inputs in same job " + inputs_in_same_job);
- LOG.trace("Unassigned inputs " + unassigned_inputs);
- LOG.trace("Child queued " + child_queued);
-
- }
-
- //if(node.definesMRJob() && isChild(tmpNode,node) && (tmpNode.getCompatibleJobs() & node.getCompatibleJobs()) == 0)
- // continue;
-
- // TODO: statiko -- check if this is too conservative?
- if (child_queued) {
- // if one of the children are queued,
- // remove some child nodes on other leg that may be needed later on.
- // For e.g. Group lop.
-
- if((tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- markedNodes.add(tmpNode);
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
- }
- else {
- if (!hasOtherQueuedParentNode(tmpNode, queuedNodes, node)
- && isChild(tmpNode, node, IDMap) && branchHasNoOtherUnExecutedParents(tmpNode, node, execNodes, finishedNodes)) {
-
-
- if(
- //e.g. MMCJ
- (node.getExecLocation() == ExecLocation.MapAndReduce &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes) && !tmpNode.definesMRJob() )
- ||
- //e.g. Binary
- (node.getExecLocation() == ExecLocation.Reduce && branchCanBePiggyBackedReduce(tmpNode, node, execNodes, finishedNodes)) )
- {
-
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
-
- markedNodes.add(tmpNode);
- }
- }
- }
- } else {
- /*
- * "node" has no other queued children.
- *
- * If inputs are in the same job and "node" is of type
- * MapAndReduce, then remove nodes of all types other than
- * Reduce, MapAndReduce, and the ones that define a MR job as
- * they can be piggybacked later.
- *
- * e.g: A=Rand, B=Rand, C=A%*%B Here, both inputs of MMCJ lop
- * come from Rand job, and they should not be removed.
- *
- * Other examples: -- MMCJ whose children are of type
- * MapAndReduce (say GMR) -- Inputs coming from two different
- * jobs .. GMR & REBLOCK
- */
- if ((inputs_in_same_job || unassigned_inputs)
- && node.getExecLocation() == ExecLocation.MapAndReduce
- && !hasOtherMapAndReduceParentNode(tmpNode, execNodes,
- node)
- && isChild(tmpNode, node, IDMap) &&
- branchCanBePiggyBackedMapAndReduce(tmpNode, node, execNodes, finishedNodes)
- && !tmpNode.definesMRJob()) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration:: ("
- + tmpNode.getID() + ") " + tmpNode.toString());
-
- markedNodes.add(tmpNode);
- }
-
- // as this node has inputs coming from different jobs, need to
- // free up everything
- // below and include the closest MapAndReduce lop if this is of
- // type Reduce.
- // if it is of type MapAndReduce, don't need to free any nodes
-
- if (!inputs_in_same_job && !unassigned_inputs
- && isChild(tmpNode, node, IDMap) &&
- (tmpNode == node.getInputs().get(0) || tmpNode == node.getInputs().get(1)) &&
- tmpNode.isAligner())
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration ("
- + tmpNode.getID()
- + ") "
- + tmpNode.toString());
-
- markedNodes.add(tmpNode);
-
- }
-
- }
- }
-
- // we also need to delete all parent nodes of marked nodes
- for (int i = 0; i < execNodes.size(); i++) {
- if( LOG.isTraceEnabled() )
- LOG.trace("Checking for removal - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
-
- if (hasChildNode(execNodes.get(i), markedNodes)) {
- markedNodes.add(execNodes.get(i));
- if( LOG.isTraceEnabled() )
- LOG.trace("Removing for next iteration - ("
- + execNodes.get(i).getID() + ") "
- + execNodes.get(i).toString());
- }
- }
-
- // delete marked nodes from finishedNodes and execNodes
- // add to queued nodes
- for (int i = 0; i < markedNodes.size(); i++) {
- finishedNodes.remove(markedNodes.get(i));
- execNodes.remove(markedNodes.get(i));
- removeNodeByJobType(markedNodes.get(i), jobvec);
- queuedNodes.add(markedNodes.get(i));
- }
}
private boolean branchCanBePiggyBackedReduce(N tmpNode, N node, ArrayList<N> execNodes, ArrayList<N> queuedNodes) {
@@ -2162,9 +1800,7 @@ public class Dag<N extends Lop>
return false;
}
- for(int i=0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
+ for( N n : execNodes ) {
if(n.equals(node))
continue;
@@ -2177,10 +1813,6 @@ public class Dag<N extends Lop>
&& n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
return false;
}
- /*if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap)
- && !node.getInputs().contains(tmpNode)
- && n.getExecLocation() != ExecLocation.Map && n.getExecLocation() != ExecLocation.MapOrReduce)
- return false;*/
}
return true;
}
@@ -2258,9 +1890,7 @@ public class Dag<N extends Lop>
return false;
JobType jt = JobType.findJobTypeFromLop(node);
- for (int i = 0; i < execNodes.size(); i++) {
- N n = execNodes.get(i);
-
+ for ( N n : execNodes ) {
if (n.equals(node))
continue;
@@ -2274,16 +1904,6 @@ public class Dag<N extends Lop>
else if (!isCompatible(n, jt))
return false;
}
-
- /*
- * if(n.equals(tmpNode) && n.getExecLocation() != ExecLocation.Map
- * && n.getExecLocation() != ExecLocation.MapOrReduce) return false;
- *
- * if(isChild(n, node, IDMap) && isChild(tmpNode, n, IDMap) &&
- * n.getExecLocation() != ExecLocation.Map && n.getExecLocation() !=
- * ExecLocation.MapOrReduce) return false;
- */
-
}
return true;
}
@@ -2305,10 +1925,7 @@ public class Dag<N extends Lop>
}
//check to see if any node between node and tmpNode has more than one unfinished output
- for(int i=0; i < execNodes.size(); i++)
- {
- N n = execNodes.get(i);
-
+ for( N n : execNodes ) {
if(n.equals(node) || n.equals(tmpNode))
continue;
@@ -2328,36 +1945,6 @@ public class Dag<N extends Lop>
return true;
}
- /**
- * Method to check of there is a node of type MapAndReduce between a child
- * (tmpNode) and its parent (node)
- *
- * @param tmpNode
- * @param execNodes
- * @param node
- * @return
- */
-
- @SuppressWarnings({ "unchecked", "unused" })
- private boolean hasMapAndReduceParentNode(N tmpNode, ArrayList<N> execNodes, N node)
- {
- for (int i = 0; i < tmpNode.getOutputs().size(); i++) {
- N n = (N) tmpNode.getOutputs().get(i);
-
- if (execNodes.contains(n)
- && n.getExecLocation() == ExecLocation.MapAndReduce
- && isChild(n, node, IDMap)) {
- return true;
- } else {
- if (hasMapAndReduceParentNode(n, execNodes, node))
- return true;
- }
-
- }
-
- return false;
- }
-
/**
* Method to return the job index for a lop.
*
@@ -2366,7 +1953,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
private int jobType(Lop lops, ArrayList<ArrayList<N>> jobvec) throws LopsException {
for ( JobType jt : JobType.values()) {
int i = jt.getId();
@@ -2386,7 +1972,6 @@ public class Dag<N extends Lop>
* @param node
* @return
*/
-
@SuppressWarnings("unchecked")
private boolean hasOtherMapAndReduceParentNode(N tmpNode,
ArrayList<N> nodeList, N node) {
@@ -2425,10 +2010,9 @@ public class Dag<N extends Lop>
long nodeid = IDMap.get(node.getID());
long tmpid = IDMap.get(tmpNode.getID());
- for ( int i=0; i < queuedNodes.size(); i++ ) {
- int id = IDMap.get(queuedNodes.get(i).getID());
+ for ( N qnode : queuedNodes ) {
+ int id = IDMap.get(qnode.getID());
if ((id != nodeid && nodeMarked[id]) && (id != tmpid && tmpMarked[id]) )
- //if (nodeMarked[id] && tmpMarked[id])
return true;
}
@@ -2441,8 +2025,9 @@ public class Dag<N extends Lop>
* @param jobNodes
* @throws DMLRuntimeException
*/
- void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
- throws DMLRuntimeException {
+ private void printJobNodes(ArrayList<ArrayList<N>> jobNodes)
+ throws DMLRuntimeException
+ {
if (LOG.isTraceEnabled()){
for ( JobType jt : JobType.values() ) {
int i = jt.getId();
@@ -2458,8 +2043,6 @@ public class Dag<N extends Lop>
}
}
-
-
}
/**
@@ -2469,7 +2052,7 @@ public class Dag<N extends Lop>
* @param loc
* @return
*/
- boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
+ private boolean hasANode(ArrayList<N> nodes, ExecLocation loc) {
for (int i = 0; i < nodes.size(); i++) {
if (nodes.get(i).getExecLocation() == ExecLocation.RecordReader)
return true;
@@ -2477,8 +2060,8 @@ public class Dag<N extends Lop>
return false;
}
- ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes) {
-
+ private ArrayList<ArrayList<N>> splitGMRNodesByRecordReader(ArrayList<N> gmrnodes)
+ {
// obtain the list of record reader nodes
ArrayList<N> rrnodes = new ArrayList<N>();
for (int i = 0; i < gmrnodes.size(); i++) {
@@ -2542,8 +2125,7 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
* @throws DMLUnsupportedOperationException
*/
-
- public void generateMRJobs(ArrayList<N> execNodes,
+ private void generateMRJobs(ArrayList<N> execNodes,
ArrayList<Instruction> inst,
ArrayList<Instruction> writeinst,
ArrayList<Instruction> deleteinst, ArrayList<ArrayList<N>> jobNodes)
@@ -2551,17 +2133,6 @@ public class Dag<N extends Lop>
DMLRuntimeException
{
-
- /*// copy unassigned lops in execnodes to gmrnodes
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.elementAt(i);
- if (jobType(node, jobNodes) == -1) {
- jobNodes.get(JobType.GMR.getId()).add(node);
- addChildren(node, jobNodes.get(JobType.GMR.getId()),
- execNodes);
- }
- }*/
-
printJobNodes(jobNodes);
ArrayList<Instruction> rmvarinst = new ArrayList<Instruction>();
@@ -2632,61 +2203,22 @@ public class Dag<N extends Lop>
}
/**
- * Method to get the input format for a lop
- *
- * @param elementAt
- * @return
- * @throws LopsException
- */
-
- // This code is replicated in ReBlock.java
- @SuppressWarnings("unused")
- private Format getChildFormat(N node)
- throws LopsException
- {
- if (node.getOutputParameters().getFile_name() != null
- || node.getOutputParameters().getLabel() != null) {
- return node.getOutputParameters().getFormat();
- } else {
- if (node.getInputs().size() > 1)
- throw new LopsException(node.printErrorLocation() + "Should only have one child! \n");
- /*
- * Return the format of the child node (i.e., input lop) No need of
- * recursion here.. because 1) Reblock lop's input can either be
- * DataLop or some intermediate computation If it is Data then we
- * just take its format (TEXT or BINARY) If it is intermediate lop
- * then it is always BINARY since we assume that all intermediate
- * computations will be in Binary format 2) Note that Reblock job
- * will never have any instructions in the mapper => the input lop
- * (if it is other than Data) is always executed in a different job
- */
- return node.getInputs().get(0).getOutputParameters().getFormat();
- // return getChildFormat((N) node.getInputs().get(0));
- }
-
- }
-
- /**
* Method to add all parents of "node" in exec_n to node_v.
*
* @param node
* @param node_v
* @param exec_n
*/
-
private void addParents(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
- for (int i = 0; i < exec_n.size(); i++) {
- if (isChild(node, exec_n.get(i), IDMap)) {
- if (!node_v.contains(exec_n.get(i))) {
+ for (N enode : exec_n ) {
+ if (isChild(node, enode, IDMap)) {
+ if (!node_v.contains(enode)) {
if( LOG.isTraceEnabled() )
- LOG.trace("Adding parent - "
- + exec_n.get(i).toString());
- node_v.add(exec_n.get(i));
+ LOG.trace("Adding parent - " + enode.toString());
+ node_v.add(enode);
}
}
-
}
-
}
/**
@@ -2696,11 +2228,10 @@ public class Dag<N extends Lop>
* @param node_v
* @param exec_n
*/
-
@SuppressWarnings("unchecked")
private void addChildren(N node, ArrayList<N> node_v, ArrayList<N> exec_n) {
- /** add child in exec nodes that is not of type scalar **/
+ // add child in exec nodes that is not of type scalar
if (exec_n.contains(node)
&& node.getExecLocation() != ExecLocation.ControlProgram) {
if (!node_v.contains(node)) {
@@ -2714,9 +2245,7 @@ public class Dag<N extends Lop>
if (!exec_n.contains(node))
return;
- /**
- * recurse
- */
+ // recurse
for (int i = 0; i < node.getInputs().size(); i++) {
N n = (N) node.getInputs().get(i);
addChildren(n, node_v, exec_n);
@@ -2730,7 +2259,7 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
- OutputInfo getOutputInfo(N node, boolean cellModeOverride)
+ private OutputInfo getOutputInfo(N node, boolean cellModeOverride)
throws LopsException
{
if ( (node.getDataType() == DataType.SCALAR && node.getExecType() == ExecType.CP)
@@ -2797,22 +2326,8 @@ public class Dag<N extends Lop>
return oinfo;
}
-
- /**
- * Method that determines the output format for a given node,
- * and returns a string representation of OutputInfo. This
- * method is primarily used while generating instructions that
- * execute in the control program.
- *
- * @param node
- * @return
- */
-/* String getOutputFormat(N node) {
- return OutputInfo.outputInfoToString(getOutputInfo(node, false));
- }
-*/
- public String prepareAssignVarInstruction(Lop input, Lop node) {
+ private String prepareAssignVarInstruction(Lop input, Lop node) {
StringBuilder sb = new StringBuilder();
sb.append(ExecType.CP);
@@ -3295,7 +2810,7 @@ public class Dag<N extends Lop>
* @throws DMLRuntimeException
*/
@SuppressWarnings("unchecked")
- public void generateMapReduceInstructions(ArrayList<N> execNodes,
+ private void generateMapReduceInstructions(ArrayList<N> execNodes,
ArrayList<Instruction> inst, ArrayList<Instruction> writeinst, ArrayList<Instruction> deleteinst, ArrayList<Instruction> rmvarinst,
JobType jt) throws LopsException,
DMLUnsupportedOperationException, DMLRuntimeException
@@ -3339,19 +2854,18 @@ public class Dag<N extends Lop>
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
ArrayList<N> markedNodes = new ArrayList<N>();
// only keep data nodes that are results of some computation.
- for (int i = 0; i < rootNodes.size(); i++) {
- N node = rootNodes.get(i);
- if (node.getExecLocation() == ExecLocation.Data
- && ((Data) node).isTransient()
- && ((Data) node).getOperationType() == OperationTypes.WRITE
- && ((Data) node).getDataType() == DataType.MATRIX) {
+ for ( N rnode : rootNodes ) {
+ if (rnode.getExecLocation() == ExecLocation.Data
+ && ((Data) rnode).isTransient()
+ && ((Data) rnode).getOperationType() == OperationTypes.WRITE
+ && ((Data) rnode).getDataType() == DataType.MATRIX) {
// no computation, just a copy
- if (node.getInputs().get(0).getExecLocation() == ExecLocation.Data
- && ((Data) node.getInputs().get(0)).isTransient()
- && node.getOutputParameters().getLabel().compareTo(
- node.getInputs().get(0)
- .getOutputParameters().getLabel()) == 0) {
- markedNodes.add(node);
+ if (rnode.getInputs().get(0).getExecLocation() == ExecLocation.Data
+ && ((Data) rnode.getInputs().get(0)).isTransient()
+ && rnode.getOutputParameters().getLabel().compareTo(
+ rnode.getInputs().get(0).getOutputParameters().getLabel()) == 0)
+ {
+ markedNodes.add(rnode);
}
}
}
@@ -3367,10 +2881,9 @@ public class Dag<N extends Lop>
/* Determine all input data files */
- for (int i = 0; i < rootNodes.size(); i++) {
- getInputPathsAndParameters(rootNodes.get(i), execNodes,
- inputs, inputInfos, numRows, numCols, numRowsPerBlock,
- numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
+ for ( N rnode : rootNodes ) {
+ getInputPathsAndParameters(rnode, execNodes, inputs, inputInfos, numRows, numCols,
+ numRowsPerBlock, numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
// In case of RAND job, instructions are defined in the input file
@@ -3384,10 +2897,9 @@ public class Dag<N extends Lop>
// currently, recordreader instructions are allowed only in GMR jobs
if (jt == JobType.GMR || jt == JobType.GMRCELL) {
- for (int i = 0; i < rootNodes.size(); i++) {
- getRecordReaderInstructions(rootNodes.get(i), execNodes,
- inputs, recordReaderInstructions, nodeIndexMapping,
- start_index, inputLabels, inputLops, MRJobLineNumbers);
+ for ( N rnode : rootNodes ) {
+ getRecordReaderInstructions(rnode, execNodes, inputs, recordReaderInstructions,
+ nodeIndexMapping, start_index, inputLabels, inputLops, MRJobLineNumbers);
if ( recordReaderInstructions.size() > 1 )
throw new LopsException("MapReduce job can only have a single recordreader instruction: " + recordReaderInstructions.toString());
}
@@ -3544,23 +3056,6 @@ public class Dag<N extends Lop>
processConsumers((N)l, rmvarinst, deleteinst, null);
}
}
-
- }
-
- /**
- * Convert a byte array into string
- *
- * @param arr
- * @return none
- */
- public String getString(byte[] arr) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < arr.length; i++) {
- sb.append(",");
- sb.append(Byte.toString(arr[i]));
- }
-
- return sb.toString();
}
/**
@@ -3571,16 +3066,14 @@ public class Dag<N extends Lop>
*/
private String getCSVString(ArrayList<String> inputStrings) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < inputStrings.size(); i++) {
- String tmp = inputStrings.get(i);
- if( tmp != null ) {
+ for ( String str : inputStrings ) {
+ if( str != null ) {
if( sb.length()>0 )
sb.append(Lop.INSTRUCTION_DELIMITOR);
- sb.append( tmp );
+ sb.append( str );
}
}
return sb.toString();
-
}
/**
@@ -3589,7 +3082,6 @@ public class Dag<N extends Lop>
* @param list
* @return
*/
-
private String[] getStringArray(ArrayList<String> list) {
String[] arr = new String[list.size()];
@@ -3615,7 +3107,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getAggAndOtherInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> shuffleInstructions,
@@ -3624,9 +3115,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
int ret_val = -1;
if (nodeIndexMapping.containsKey(node))
@@ -3813,9 +3302,6 @@ public class Dag<N extends Lop>
start_index[0]++;
if (node.getType() == Type.Ternary ) {
- //Tertiary.OperationTypes op = ((Tertiary<?, ?, ?, ?>) node).getOperationType();
- //if ( op == Tertiary.OperationTypes.CTABLE_TRANSFORM ) {
-
// in case of CTABLE_TRANSFORM_SCALAR_WEIGHT: inputIndices.get(2) would be -1
otherInstructionsReducer.add(node.getInstructions(
inputIndices.get(0), inputIndices.get(1),
@@ -3824,7 +3310,6 @@ public class Dag<N extends Lop>
MRJobLineNumbers.add(node._beginLine);
}
nodeIndexMapping.put(node, output_index);
- //}
}
else if( node.getType() == Type.ParameterizedBuiltin ){
otherInstructionsReducer.add(node.getInstructions(
@@ -3848,7 +3333,6 @@ public class Dag<N extends Lop>
}
return output_index;
-
}
else if (inputIndices.size() == 4) {
int output_index = start_index[0];
@@ -3868,7 +3352,6 @@ public class Dag<N extends Lop>
}
return -1;
-
}
/**
@@ -3885,7 +3368,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getRecordReaderInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> inputStrings,
@@ -3893,9 +3375,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
// if input source, return index
if (nodeIndexMapping.containsKey(node))
return nodeIndexMapping.get(node);
@@ -3910,7 +3390,6 @@ public class Dag<N extends Lop>
// get mapper instructions
for (int i = 0; i < node.getInputs().size(); i++) {
-
// recurse
N childNode = (N) node.getInputs().get(i);
int ret_val = getRecordReaderInstructions(childNode, execNodes,
@@ -3932,17 +3411,14 @@ public class Dag<N extends Lop>
// cannot reuse index if this is true
// need to add better indexing schemes
- // if (child_for_max_input_index.getOutputs().size() > 1) {
output_index = start_index[0];
start_index[0]++;
- // }
nodeIndexMapping.put(node, output_index);
// populate list of input labels.
// only Ranagepick lop can contribute to labels
if (node.getType() == Type.PickValues) {
-
PickByCount pbc = (PickByCount) node;
if (pbc.getOperationType() == PickByCount.OperationTypes.RANGEPICK) {
int scalarIndex = 1; // always the second input is a scalar
@@ -3975,11 +3451,9 @@ public class Dag<N extends Lop>
"Unexpected number of inputs while generating a RecordReader Instruction");
return output_index;
-
}
return -1;
-
}
/**
@@ -3996,7 +3470,6 @@ public class Dag<N extends Lop>
* @return
* @throws LopsException
*/
-
@SuppressWarnings("unchecked")
private int getMapperInstructions(N node, ArrayList<N> execNodes,
ArrayList<String> inputStrings,
@@ -4004,9 +3477,7 @@ public class Dag<N extends Lop>
HashMap<N, Integer> nodeIndexMapping, int[] start_index,
ArrayList<String> inputLabels, ArrayList<Lop> inputLops,
ArrayList<Integer> MRJobLineNumbers) throws LopsException
-
{
-
// if input source, return index
if (nodeIndexMapping.containsKey(node))
return nodeIndexMapping.get(node);
@@ -4134,11 +3605,9 @@ public class Dag<N extends Lop>
MRJobLineNumbers.add(node._beginLine);
}
return output_index;
-
}
return -1;
-
}
// Method to populate inputs and also populates node index mapping.
@@ -4155,12 +3624,9 @@ public class Dag<N extends Lop>
&& !nodeIndexMapping.containsKey(node)) {
numRows.add(node.getOutputParameters().getNumRows());
numCols.add(node.getOutputParameters().getNumCols());
- numRowsPerBlock.add(node.getOutputParameters()
- .getRowsInBlock());
- numColsPerBlock.add(node.getOutputParameters()
- .getColsInBlock());
- inputStrings.add(node.getInstructions(inputStrings.size(),
- inputStrings.size()));
+ numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+ numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
+ inputStrings.add(node.getInstructions(inputStrings.size(), inputStrings.size()));
if(DMLScript.ENABLE_DEBUG_MODE) {
MRJobLineNumbers.add(node._beginLine);
}
@@ -4170,10 +3636,6 @@ public class Dag<N extends Lop>
return;
}
- // && ( !(node.getExecLocation() == ExecLocation.ControlProgram)
- // || (node.getExecLocation() == ExecLocation.ControlProgram &&
- // node.getDataType() != DataType.SCALAR )
- // )
// get input file names
if (!execNodes.contains(node)
&& !nodeIndexMapping.containsKey(node)
@@ -4192,19 +3654,14 @@ public class Dag<N extends Lop>
inputStrings.add(Lop.VARIABLE_NAME_PLACEHOLDER + node.getOutputParameters().getLabel()
+ Lop.VARIABLE_NAME_PLACEHOLDER);
}
- //if ( node.getType() == Lops.Type.Data && ((Data)node).isTransient())
- // inputStrings.add("##" + node.getOutputParameters().getLabel() + "##");
- //else
- // inputStrings.add(node.getOutputParameters().getLabel());
+
inputLabels.add(node.getOutputParameters().getLabel());
inputLops.add(node);
numRows.add(node.getOutputParameters().getNumRows());
numCols.add(node.getOutputParameters().getNumCols());
- numRowsPerBlock.add(node.getOutputParameters()
- .getRowsInBlock());
- numColsPerBlock.add(node.getOutputParameters()
- .getColsInBlock());
+ numRowsPerBlock.add(node.getOutputParameters().getRowsInBlock());
+ numColsPerBlock.add(node.getOutputParameters().getColsInBlock());
InputInfo nodeInputInfo = null;
// Check if file format type is binary or text and update infos
@@ -4262,15 +3719,12 @@ public class Dag<N extends Lop>
}
// if exec nodes does not contain node at this point, return.
-
if (!execNodes.contains(node))
return;
// process children recursively
-
- for (int i = 0; i < node.getInputs().size(); i++) {
- N childNode = (N) node.getInputs().get(i);
- getInputPathsAndParameters(childNode, execNodes, inputStrings,
+ for ( Lop lop : node.getInputs() ) {
+ getInputPathsAndParameters((N)lop, execNodes, inputStrings,
inputInfos, numRows, numCols, numRowsPerBlock,
numColsPerBlock, nodeIndexMapping, inputLabels, inputLops, MRJobLineNumbers);
}
@@ -4283,39 +3737,32 @@ public class Dag<N extends Lop>
* @param execNodes
* @param rootNodes
*/
-
private void getOutputNodes(ArrayList<N> execNodes, ArrayList<N> rootNodes, JobType jt) {
- for (int i = 0; i < execNodes.size(); i++) {
- N node = execNodes.get(i);
-
+ for ( N node : execNodes ) {
// terminal node
if (node.getOutputs().isEmpty() && !rootNodes.contains(node)) {
rootNodes.add(node);
- } else {
+ }
+ else {
// check for nodes with at least one child outside execnodes
int cnt = 0;
- for (int j = 0; j < node.getOutputs().size(); j++) {
- if (!execNodes.contains(node.getOutputs().get(j))) {
- cnt++;
- }
+ for (Lop lop : node.getOutputs() ) {
+ cnt += (!execNodes.contains(lop)) ? 1 : 0;
}
- if (cnt > 0 //!= 0
- //&& cnt <= node.getOutputs().size()
- && !rootNodes.contains(node) // not already a rootnode
- && !(node.getExecLocation() == ExecLocation.Data
- && ((Data) node).getOperationType() == OperationTypes.READ && ((Data) node)
- .getDataType() == DataType.MATRIX) // Not a matrix Data READ
- ) {
-
-
+ if (cnt > 0 && !rootNodes.contains(node) // not already a rootnode
+ && !(node.getExecLocation() == ExecLocation.Data
+ && ((Data) node).getOperationType() == OperationTypes.READ
+ && ((Data) node).getDataType() == DataType.MATRIX) ) // Not a matrix Data READ
+ {
if ( jt.allowsSingleShuffleInstruction() && node.getExecLocation() != ExecLocation.MapAndReduce)
continue;
if (cnt < node.getOutputs().size()) {
if(!node.getProducesIntermediateOutput())
rootNodes.add(node);
- } else
+ }
+ else
rootNodes.add(node);
}
}
@@ -4328,9 +3775,7 @@ public class Dag<N extends Lop>
* @param a
* @param b
*/
-
- public static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
- //int aID = IDMap.get(a.getID());
+ private static boolean isChild(Lop a, Lop b, HashMap<Long, Integer> IDMap) {
int bID = IDMap.get(b.getID());
return a.get_reachable()[bID];
}
@@ -4381,17 +3826,15 @@ public class Dag<N extends Lop>
// print the nodes in sorted order
if (LOG.isTraceEnabled()) {
- for (int i = 0; i < v.size(); i++) {
- // System.out.print(sortedNodes.get(i).getID() + "("
- // + levelmap.get(sortedNodes.get(i).getID()) + "), ");
+ for ( N vnode : v ) {
StringBuilder sb = new StringBuilder();
- sb.append(v.get(i).getID());
+ sb.append(vnode.getID());
sb.append("(");
- sb.append(v.get(i).getLevel());
+ sb.append(vnode.getLevel());
sb.append(") ");
- sb.append(v.get(i).getType());
+ sb.append(vnode.getType());
sb.append("(");
- for(Lop vin : v.get(i).getInputs()) {
+ for(Lop vin : vnode.getInputs()) {
sb.append(vin.getID());
sb.append(",");
}
@@ -4413,7 +3856,7 @@ public class Dag<N extends Lop>
* @param marked
*/
@SuppressWarnings("unchecked")
- void dagDFS(N root, boolean[] marked) {
+ private void dagDFS(N root, boolean[] marked) {
//contains check currently required for globalopt, will be removed when cleaned up
if( !IDMap.containsKey(root.getID()) )
return;
@@ -4422,33 +3865,32 @@ public class Dag<N extends Lop>
if ( marked[mapID] )
return;
marked[mapID] = true;
- for(int i=0; i < root.getOutputs().size(); i++) {
- //System.out.println("CALLING DFS "+root);
- dagDFS((N)root.getOutputs().get(i), marked);
+ for( Lop lop : root.getOutputs() ) {
+ dagDFS((N)lop, marked);
}
}
private boolean hasDirectChildNode(N node, ArrayList<N> childNodes) {
if ( childNodes.isEmpty() )
return false;
-
- for(int i=0; i < childNodes.size(); i++) {
- if ( childNodes.get(i).getOutputs().contains(node))
+ for( N cnode : childNodes ) {
+ if ( cnode.getOutputs().contains(node))
return true;
}
return false;
}
+ private boolean hasChildNode(N node, ArrayList<N> nodes) {
+ return hasChildNode(node, nodes, ExecLocation.INVALID);
+ }
+
private boolean hasChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return false;
-
int index = IDMap.get(node.getID());
- for(int i=0; i < childNodes.size(); i++) {
- N cn = childNodes.get(i);
- if ( (type == ExecLocation.INVALID || cn.getExecLocation() == type) && cn.get_reachable()[index]) {
+ for( N cnode : childNodes ) {
+ if ( (type == ExecLocation.INVALID || cnode.getExecLocation() == type) && cnode.get_reachable()[index])
return true;
- }
}
return false;
}
@@ -4456,13 +3898,10 @@ public class Dag<N extends Lop>
private N getChildNode(N node, ArrayList<N> childNodes, ExecLocation type) {
if ( childNodes.isEmpty() )
return null;
-
int index = IDMap.get(node.getID());
- for(int i=0; i < childNodes.size(); i++) {
- N cn = childNodes.get(i);
- if ( cn.getExecLocation() == type && cn.get_reachable()[index]) {
- return cn;
- }
+ for( N cnode : childNodes ) {
+ if ( cnode.getExecLocation() == type && cnode.get_reachable()[index])
+ return cnode;
}
return null;
}
@@ -4479,8 +3918,7 @@ public class Dag<N extends Lop>
private N getParentNode(N node, ArrayList<N> parentNodes, ExecLocation type) {
if ( parentNodes.isEmpty() )
return null;
- for(int i=0; i < parentNodes.size(); i++ ) {
- N pn = parentNodes.get(i);
+ for( N pn : parentNodes ) {
int index = IDMap.get( pn.getID() );
if ( pn.getExecLocation() == type && node.get_reachable()[index])
return pn;
@@ -4495,9 +3933,7 @@ public class Dag<N extends Lop>
return false;
int index = IDMap.get(node.getID());
-
- for (int i = 0; i < nodesVec.size(); i++) {
- N n = nodesVec.get(i);
+ for( N n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index])
return true;
}
@@ -4510,8 +3946,7 @@ public class Dag<N extends Lop>
int index = IDMap.get(node.getID());
boolean onlyDatagen = true;
- for (int i = 0; i < nodesVec.size(); i++) {
- N n = nodesVec.get(i);
+ for( N n : nodesVec ) {
if ( n.definesMRJob() && n.get_reachable()[index] && JobType.findJobTypeFromLop(n) != JobType.DATAGEN )
onlyDatagen = false;
}
@@ -4520,11 +3955,10 @@ public class Dag<N extends Lop>
}
@SuppressWarnings("unchecked")
- private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type) {
-
- for (int i = 0; i < node.getInputs().size(); i++) {
- N n = (N) node.getInputs().get(i);
-
+ private int getChildAlignment(N node, ArrayList<N> execNodes, ExecLocation type)
+ {
+ for (Lop lop : node.getInputs() ) {
+ N n = (N) lop;
if (!execNodes.contains(n))
continue;
@@ -4533,44 +3967,36 @@ public class Dag<N extends Lop>
return MR_CHILD_FOUND_BREAKS_ALIGNMENT;
else
return MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT;
- } else {
+ }
+ else {
int ret = getChildAlignment(n, execNodes, type);
if (ret == MR_CHILD_FOUND_DOES_NOT_BREAK_ALIGNMENT
- || ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
+ || ret == CHILD_DOES_NOT_BREAK_ALIGNMENT) {
if (n.getBreaksAlignment())
return CHILD_BREAKS_ALIGNMENT;
else
return CHILD_DOES_NOT_BREAK_ALIGNMENT;
}
-
else if (ret == MRCHILD_NOT_FOUND
|| ret == CHILD_BREAKS_ALIGNMENT
|| ret == MR_CHILD_FOUND_BREAKS_ALIGNMENT)
return ret;
else
- throw new RuntimeException(
- "Something wrong in getChildAlignment().");
+ throw new RuntimeException("Something wrong in getChildAlignment().");
}
}
return MRCHILD_NOT_FOUND;
-
- }
-
- private boolean hasChildNode(N node, ArrayList<N> nodes) {
- return hasChildNode(node, nodes, ExecLocation.INVALID);
}
private boolean hasParentNode(N node, ArrayList<N> parentNodes) {
if ( parentNodes.isEmpty() )
- return false;
-
- for( int i=0; i < parentNodes.size(); i++ ) {
- int index = IDMap.get( parentNodes.get(i).getID() );
+ return false;
+ for( N pnode : parentNodes ) {
+ int index = IDMap.get( pnode.getID() );
if ( node.get_reachable()[index])
return true;
}
return false;
}
-
}