You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@systemds.apache.org by GitBox <gi...@apache.org> on 2021/12/13 17:25:21 UTC

[GitHub] [systemds] Baunsgaard commented on a change in pull request #1459: [SYSTEMDS-3221] Federated DataGENInstruction

Baunsgaard commented on a change in pull request #1459:
URL: https://github.com/apache/systemds/pull/1459#discussion_r767960730



##########
File path: src/main/java/org/apache/sysds/api/DMLScript.java
##########
@@ -277,6 +280,7 @@ public static boolean executeScript( Configuration conf, String[] args )
 			//reset runtime platform and visualize flag
 			setGlobalExecMode(oldrtplatform);
 			EXPLAIN = oldexplain;
+			DMLScript.FED_WORKER_PORTS.clear();

Review comment:
       I would like if this was not a variable in DMLScript, since it would not be available when executing in python.
   
   maybe put it in: 
   src/main/java/org/apache/sysds/runtime/controlprogram/context/ExecutionContext.java
   or 
   not have it.
   
   Maybe the best design is to add another argument to rand, that allows you to specify a federated range of worker addresses.
   

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/FEDInstructionUtils.java
##########
@@ -145,6 +150,11 @@ else if (inst instanceof UnaryCPInstruction && ! (inst instanceof IndexingCPInst
 					&& mo.isFederatedExcept(FType.BROADCAST) )
 					fedinst = ReorgFEDInstruction.parseInstruction(
 						InstructionUtils.concatOperands(rinst.getInstructionString(),FederatedOutput.NONE.name()));
+			} else if(inst instanceof DataGenCPInstruction) {
+				DataGenCPInstruction dinst = (DataGenCPInstruction) inst;

Review comment:
       maybe make a DataGenFedInstruction that contain the extra arguments you need.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/DataGenFEDInstruction.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.OpOpDG;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.DataGenOp;
+import org.apache.sysds.lops.DataGen;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.UtilFunctions;
+
+public class DataGenFEDInstruction extends UnaryFEDInstruction {
+	private static final Log LOG = LogFactory.getLog(DataGenFEDInstruction.class.getName());
+	private OpOpDG method;
+
+	private final CPOperand rows, cols, dims;
+	private final int blocksize;
+	private boolean minMaxAreDoubles;
+	private final String minValueStr, maxValueStr;
+	private final double minValue, maxValue, sparsity;
+	private final String pdf, pdfParams, frame_data, schema;
+	private final long seed;
+	private Long runtimeSeed;
+
+	// sequence specific attributes
+	private final CPOperand seq_from, seq_to, seq_incr;
+
+	// sample specific attributes
+	private final boolean replace;
+	private final int numThreads;
+
+	// seed positions
+	private static final int SEED_POSITION_RAND = 8;
+	private static final int SEED_POSITION_SAMPLE = 4;
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String minValue, String maxValue, double sparsity, long seed,
+		String probabilityDensityFunction, String pdfParams, int k, CPOperand seqFrom, CPOperand seqTo,
+		CPOperand seqIncr, boolean replace, String data, String schema, String opcode, String istr) {
+		super(FEDType.Rand, op, in, out, opcode, istr);
+		this.method = mthd;
+		this.rows = rows;
+		this.cols = cols;
+		this.dims = dims;
+		this.blocksize = blen;
+		this.minValueStr = minValue;
+		this.maxValueStr = maxValue;
+		double minDouble, maxDouble;
+		try {
+			minDouble = !minValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double.valueOf(minValue) : -1;
+			maxDouble = !maxValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double.valueOf(maxValue) : -1;
+			minMaxAreDoubles = true;
+		}
+		catch(NumberFormatException e) {
+			// Non double values
+			if(!minValueStr.equals(maxValueStr)) {
+				throw new DMLRuntimeException(
+					"Rand instruction does not support " + "non numeric Datatypes for range initializations.");
+			}
+			minDouble = -1;
+			maxDouble = -1;
+			minMaxAreDoubles = false;
+		}
+		this.minValue = minDouble;
+		this.maxValue = maxDouble;
+		this.sparsity = sparsity;
+		this.seed = seed;
+		this.pdf = probabilityDensityFunction;
+		this.pdfParams = pdfParams;
+		this.numThreads = k;
+		this.seq_from = seqFrom;
+		this.seq_to = seqTo;
+		this.seq_incr = seqIncr;
+		this.replace = replace;
+		this.frame_data = data;
+		this.schema = schema;
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String minValue, String maxValue, double sparsity, long seed,
+		String probabilityDensityFunction, String pdfParams, int k, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, minValue, maxValue, sparsity, seed, probabilityDensityFunction,
+			pdfParams, k, null, null, null, false, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String maxValue, boolean replace, long seed, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, "0", maxValue, 1.0, seed, null, null, 1, null, null, null,
+			replace, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, CPOperand seqFrom, CPOperand seqTo, CPOperand seqIncr, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, "0", "1", 1.0, -1, null, null, 1, seqFrom, seqTo, seqIncr,
+			false, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand out, String opcode, String istr) {
+		this(op, mthd, null, out, null, null, null, 0, "0", "0", 0, 0, null, null, 1, null, null, null, false, null,
+			null, opcode, istr);
+	}
+
+	public DataGenFEDInstruction(Operator op, OpOpDG method, CPOperand out, CPOperand rows, CPOperand cols, String data,
+		String schema, String opcode, String str) {
+		this(op, method, null, out, rows, cols, null, 0, "0", "0", 0, 0, null, null, 1, null, null, null, false, data,
+			schema, opcode, str);
+	}
+
+	public long getRows() {
+		return rows.isLiteral() ? UtilFunctions.parseToLong(rows.getName()) : -1;
+	}
+
+	public long getCols() {
+		return cols.isLiteral() ? UtilFunctions.parseToLong(cols.getName()) : -1;
+	}
+
+	public String getDims() {
+		return dims.getName();
+	}
+
+	public int getBlocksize() {
+		return blocksize;
+	}
+
+	public double getSparsity() {
+		return sparsity;
+	}
+
+	public String getPdf() {
+		return pdf;
+	}
+
+	public long getSeed() {
+		return seed;
+	}
+
+	public long getFrom() {
+		return seq_from.isLiteral() ? UtilFunctions.parseToLong(seq_from.getName()) : -1;
+	}
+
+	public long getTo() {
+		return seq_to.isLiteral() ? UtilFunctions.parseToLong(seq_to.getName()) : -1;
+	}
+
+	public long getIncr() {
+		return seq_incr.isLiteral() ? UtilFunctions.parseToLong(seq_incr.getName()) : -1;
+	}
+
+	public static DataGenFEDInstruction parseInstruction(String str) {
+		OpOpDG method = null;
+		String[] s = InstructionUtils.getInstructionPartsWithValueType(str);
+		String opcode = s[0];
+
+		if(opcode.equalsIgnoreCase(DataGen.RAND_OPCODE)) {
+			method = OpOpDG.RAND;
+			InstructionUtils.checkNumFields(s, 10, 11);
+		}
+		else
+			throw new DMLRuntimeException("DataGenFEDInstruction: only matrix rand(..) is supported.");
+
+		CPOperand out = new CPOperand(s[s.length - 1]);
+		Operator op = null;
+
+		if(method == OpOpDG.RAND) {
+			int missing; // number of missing params (row & cols or dims)
+			CPOperand rows = null, cols = null, dims = null;
+			if(s.length == 12) {
+				missing = 1;
+				rows = new CPOperand(s[1]);
+				cols = new CPOperand(s[2]);
+			}
+			else {
+				missing = 2;
+				dims = new CPOperand(s[1]);
+			}
+			int blen = Integer.parseInt(s[4 - missing]);
+			double sparsity = !s[7 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double
+				.parseDouble(s[7 - missing]) : -1;
+			long seed = !s[SEED_POSITION_RAND - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Long
+				.parseLong(s[SEED_POSITION_RAND - missing]) : -1;
+			String pdf = s[9 - missing];
+			String pdfParams = !s[10 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? s[10 - missing] : null;
+			int k = Integer.parseInt(s[11 - missing]);
+
+			return new DataGenFEDInstruction(op, method, null, out, rows, cols, dims, blen, s[5 - missing],
+				s[6 - missing], sparsity, seed, pdf, pdfParams, k, opcode, str);
+		}
+		else
+			throw new DMLRuntimeException("Unrecognized data generation method: " + method);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// process specific datagen operator
+		if(method == OpOpDG.RAND)
+			processRandInstruction(ec);
+	}
+
+	private MatrixObject processRandInstruction(ExecutionContext ec) {
+		MatrixObject out;
+
+		// derive number of workers to use
+		int nworkers = DMLScript.FED_WORKER_PORTS.size();
+		if(getRows() % nworkers != 0) {
+			if(Math.round(getRows() / (double) nworkers) == Math.floor(getRows() / (double) nworkers))
+				nworkers--;
+		}
+
+		// generate seeds and rows
+		long[] lSeeds = randomSeedsGenerator(nworkers);
+
+		// new sizes
+		int size = (int) Math.round(getRows() / (double) nworkers);
+		int addedRows = size * (nworkers -1);
+		long[] rowsPerWorker = new long[nworkers];
+		Arrays.fill(rowsPerWorker, 0, nworkers-1, size);
+		rowsPerWorker[nworkers-1] = getRows() - addedRows;

Review comment:
       we should specify the rows as well in the instruction, using the federated addresses, since these give the dimensions of each federated site.

##########
File path: src/test/java/org/apache/sysds/test/functions/federated/datagen/FederatedRandTest.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.federated.datagen;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.instructions.fed.FEDInstructionUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixValue;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+@net.jcip.annotations.NotThreadSafe
+public class FederatedRandTest extends AutomatedTestBase {
+
+	private final static String TEST_DIR = "functions/federated/datagen/";
+	private final static String TEST_NAME1 = "FederatedRandTest1";
+	private final static String TEST_NAME2 = "FederatedRandTest2";
+	private final static String TEST_NAME3 = "FederatedRandTest3";
+	private final static String TEST_CLASS_DIR = TEST_DIR + FederatedRandTest.class.getSimpleName() + "/";
+
+	private final static int blocksize = 1024;
+	@Parameterized.Parameter()
+	public int rows;
+	@Parameterized.Parameter(1)
+	public int cols;
+	@Parameterized.Parameter(2)
+	public double min;
+	@Parameterized.Parameter(3)
+	public double max;
+	@Parameterized.Parameter(4)
+	public double sparsity;
+	@Parameterized.Parameter(5)
+	public int seed;
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"Z"}));
+		addTestConfiguration(TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] {"Z"}));
+		addTestConfiguration(TEST_NAME3, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME3, new String[] {"Z"}));
+	}
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> data() {
+		return Arrays.asList(new Object[][] {
+			{12, 10, 0, 1, 0.7, 123},
+			{1200, 10, -10, 10, 0.9, 123}
+		});
+	}
+
+	@Test
+	public void federatedRand1CP() {
+		federatedRand(TEST_NAME1, Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void federatedRand2CP() {
+		federatedRand(TEST_NAME2, Types.ExecMode.SINGLE_NODE);
+	}
+
+	@Test
+	public void federatedRand3CP() {
+		federatedRand(TEST_NAME3, Types.ExecMode.SINGLE_NODE);
+	}
+
+	public void federatedRand(String TEST_NAME, Types.ExecMode execMode) {
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		Types.ExecMode platformOld = rtplatform;
+		rtplatform = execMode;
+		if(rtplatform == Types.ExecMode.SPARK) {
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		}
+
+		getAndLoadTestConfiguration(TEST_NAME);
+		String HOME = SCRIPT_DIR + TEST_DIR;
+
+		// write input matrices
+		int halfRows = rows / 2;
+		// We have two matrices handled by a single federated worker
+		double[][] X1 = getRandomMatrix(halfRows, cols, 0, 1, 1, 42);
+		double[][] X2 = getRandomMatrix(halfRows, cols, 0, 1, 1, 1340);
+
+		writeInputMatrixWithMTD("X1", X1, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+		writeInputMatrixWithMTD("X2", X2, false, new MatrixCharacteristics(halfRows, cols, blocksize, halfRows * cols));
+
+		int port1 = getRandomAvailablePort();
+		int port2 = getRandomAvailablePort();
+		Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
+		Thread t2 = startLocalFedWorkerThread(port2);
+
+		TestConfiguration config = availableTestConfigurations.get(TEST_NAME);
+		loadTestConfiguration(config);
+
+		// Run reference dml script with normal matrix
+		fullDMLScriptName = HOME + TEST_NAME + "Reference.dml";
+		programArgs = new String[] {"-nvargs", input("X1"), input("X2"), expected("Z")};
+		runTest(null);
+
+		FEDInstructionUtils.fedDataGen = true;
+
+		// Run actual dml script with federated matrix
+		OptimizerUtils.FEDERATED_COMPILATION = true;
+		fullDMLScriptName = HOME + TEST_NAME + ".dml";
+
+		programArgs = new String[] {"-nvargs", "X1=" + TestUtils.federatedAddress(port1, input("X1")),
+			"X2=" + TestUtils.federatedAddress(port2, input("X2")),
+			"r=" + rows, "c=" + cols, "min=" + min, "max=" + max, "sp=" + sparsity, "seed=" + seed, "Z=" + output("Z")};
+		runTest(null);
+
+		HashMap<MatrixValue.CellIndex, Double> output = TestUtils.readDMLMatrixFromHDFS(outputDirectories[0]);
+		int rowOut = output.keySet().stream().max(Comparator.comparingInt(e -> e.row)).get().row;
+		int colOut = output.keySet().stream().max(Comparator.comparingInt(e -> e.column)).get().column;
+		double minOut = output.values().stream().min(Comparator.comparingDouble(Double::doubleValue)).get();
+		double maxOut = output.values().stream().max(Comparator.comparingDouble(Double::doubleValue)).get();

Review comment:
       this is not a nice comparison, since it only veifies max and min, and also steam through all cells
   just make use of a simple compare of the outputs.

##########
File path: src/main/java/org/apache/sysds/runtime/instructions/fed/DataGenFEDInstruction.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.fed;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.common.Types;
+import org.apache.sysds.common.Types.DataType;
+import org.apache.sysds.common.Types.OpOpDG;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.DataGenOp;
+import org.apache.sysds.lops.DataGen;
+import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
+import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
+import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
+import org.apache.sysds.runtime.controlprogram.federated.FederationUtils;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.UtilFunctions;
+
+public class DataGenFEDInstruction extends UnaryFEDInstruction {
+	private static final Log LOG = LogFactory.getLog(DataGenFEDInstruction.class.getName());
+	private OpOpDG method;
+
+	private final CPOperand rows, cols, dims;
+	private final int blocksize;
+	private boolean minMaxAreDoubles;
+	private final String minValueStr, maxValueStr;
+	private final double minValue, maxValue, sparsity;
+	private final String pdf, pdfParams, frame_data, schema;
+	private final long seed;
+	private Long runtimeSeed;
+
+	// sequence specific attributes
+	private final CPOperand seq_from, seq_to, seq_incr;
+
+	// sample specific attributes
+	private final boolean replace;
+	private final int numThreads;
+
+	// seed positions
+	private static final int SEED_POSITION_RAND = 8;
+	private static final int SEED_POSITION_SAMPLE = 4;
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String minValue, String maxValue, double sparsity, long seed,
+		String probabilityDensityFunction, String pdfParams, int k, CPOperand seqFrom, CPOperand seqTo,
+		CPOperand seqIncr, boolean replace, String data, String schema, String opcode, String istr) {
+		super(FEDType.Rand, op, in, out, opcode, istr);
+		this.method = mthd;
+		this.rows = rows;
+		this.cols = cols;
+		this.dims = dims;
+		this.blocksize = blen;
+		this.minValueStr = minValue;
+		this.maxValueStr = maxValue;
+		double minDouble, maxDouble;
+		try {
+			minDouble = !minValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double.valueOf(minValue) : -1;
+			maxDouble = !maxValue.contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double.valueOf(maxValue) : -1;
+			minMaxAreDoubles = true;
+		}
+		catch(NumberFormatException e) {
+			// Non double values
+			if(!minValueStr.equals(maxValueStr)) {
+				throw new DMLRuntimeException(
+					"Rand instruction does not support " + "non numeric Datatypes for range initializations.");
+			}
+			minDouble = -1;
+			maxDouble = -1;
+			minMaxAreDoubles = false;
+		}
+		this.minValue = minDouble;
+		this.maxValue = maxDouble;
+		this.sparsity = sparsity;
+		this.seed = seed;
+		this.pdf = probabilityDensityFunction;
+		this.pdfParams = pdfParams;
+		this.numThreads = k;
+		this.seq_from = seqFrom;
+		this.seq_to = seqTo;
+		this.seq_incr = seqIncr;
+		this.replace = replace;
+		this.frame_data = data;
+		this.schema = schema;
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String minValue, String maxValue, double sparsity, long seed,
+		String probabilityDensityFunction, String pdfParams, int k, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, minValue, maxValue, sparsity, seed, probabilityDensityFunction,
+			pdfParams, k, null, null, null, false, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, String maxValue, boolean replace, long seed, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, "0", maxValue, 1.0, seed, null, null, 1, null, null, null,
+			replace, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand in, CPOperand out, CPOperand rows, CPOperand cols,
+		CPOperand dims, int blen, CPOperand seqFrom, CPOperand seqTo, CPOperand seqIncr, String opcode, String istr) {
+		this(op, mthd, in, out, rows, cols, dims, blen, "0", "1", 1.0, -1, null, null, 1, seqFrom, seqTo, seqIncr,
+			false, null, null, opcode, istr);
+	}
+
+	private DataGenFEDInstruction(Operator op, OpOpDG mthd, CPOperand out, String opcode, String istr) {
+		this(op, mthd, null, out, null, null, null, 0, "0", "0", 0, 0, null, null, 1, null, null, null, false, null,
+			null, opcode, istr);
+	}
+
+	public DataGenFEDInstruction(Operator op, OpOpDG method, CPOperand out, CPOperand rows, CPOperand cols, String data,
+		String schema, String opcode, String str) {
+		this(op, method, null, out, rows, cols, null, 0, "0", "0", 0, 0, null, null, 1, null, null, null, false, data,
+			schema, opcode, str);
+	}
+
+	public long getRows() {
+		return rows.isLiteral() ? UtilFunctions.parseToLong(rows.getName()) : -1;
+	}
+
+	public long getCols() {
+		return cols.isLiteral() ? UtilFunctions.parseToLong(cols.getName()) : -1;
+	}
+
+	public String getDims() {
+		return dims.getName();
+	}
+
+	public int getBlocksize() {
+		return blocksize;
+	}
+
+	public double getSparsity() {
+		return sparsity;
+	}
+
+	public String getPdf() {
+		return pdf;
+	}
+
+	public long getSeed() {
+		return seed;
+	}
+
+	public long getFrom() {
+		return seq_from.isLiteral() ? UtilFunctions.parseToLong(seq_from.getName()) : -1;
+	}
+
+	public long getTo() {
+		return seq_to.isLiteral() ? UtilFunctions.parseToLong(seq_to.getName()) : -1;
+	}
+
+	public long getIncr() {
+		return seq_incr.isLiteral() ? UtilFunctions.parseToLong(seq_incr.getName()) : -1;
+	}
+
+	public static DataGenFEDInstruction parseInstruction(String str) {
+		OpOpDG method = null;
+		String[] s = InstructionUtils.getInstructionPartsWithValueType(str);
+		String opcode = s[0];
+
+		if(opcode.equalsIgnoreCase(DataGen.RAND_OPCODE)) {
+			method = OpOpDG.RAND;
+			InstructionUtils.checkNumFields(s, 10, 11);
+		}
+		else
+			throw new DMLRuntimeException("DataGenFEDInstruction: only matrix rand(..) is supported.");
+
+		CPOperand out = new CPOperand(s[s.length - 1]);
+		Operator op = null;
+
+		if(method == OpOpDG.RAND) {
+			int missing; // number of missing params (row & cols or dims)
+			CPOperand rows = null, cols = null, dims = null;
+			if(s.length == 12) {
+				missing = 1;
+				rows = new CPOperand(s[1]);
+				cols = new CPOperand(s[2]);
+			}
+			else {
+				missing = 2;
+				dims = new CPOperand(s[1]);
+			}
+			int blen = Integer.parseInt(s[4 - missing]);
+			double sparsity = !s[7 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Double
+				.parseDouble(s[7 - missing]) : -1;
+			long seed = !s[SEED_POSITION_RAND - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? Long
+				.parseLong(s[SEED_POSITION_RAND - missing]) : -1;
+			String pdf = s[9 - missing];
+			String pdfParams = !s[10 - missing].contains(Lop.VARIABLE_NAME_PLACEHOLDER) ? s[10 - missing] : null;
+			int k = Integer.parseInt(s[11 - missing]);
+
+			return new DataGenFEDInstruction(op, method, null, out, rows, cols, dims, blen, s[5 - missing],
+				s[6 - missing], sparsity, seed, pdf, pdfParams, k, opcode, str);
+		}
+		else
+			throw new DMLRuntimeException("Unrecognized data generation method: " + method);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// process specific datagen operator
+		if(method == OpOpDG.RAND)
+			processRandInstruction(ec);
+	}
+
+	private MatrixObject processRandInstruction(ExecutionContext ec) {
+		MatrixObject out;
+
+		// derive number of workers to use
+		int nworkers = DMLScript.FED_WORKER_PORTS.size();
+		if(getRows() % nworkers != 0) {
+			if(Math.round(getRows() / (double) nworkers) == Math.floor(getRows() / (double) nworkers))
+				nworkers--;
+		}
+
+		// generate seeds and rows
+		long[] lSeeds = randomSeedsGenerator(nworkers);
+
+		// new sizes
+		int size = (int) Math.round(getRows() / (double) nworkers);
+		int addedRows = size * (nworkers -1);
+		long[] rowsPerWorker = new long[nworkers];
+		Arrays.fill(rowsPerWorker, 0, nworkers-1, size);
+		rowsPerWorker[nworkers-1] = getRows() - addedRows;
+
+		out = processRandInstructionMatrix(ec, rowsPerWorker, lSeeds);
+
+		// reset runtime seed (e.g., when executed in loop)
+		runtimeSeed = null;
+		return out;
+	}
+
+	private MatrixObject processRandInstructionMatrix(ExecutionContext ec, long[] rowsPerWorker, long[] lSeeds) {
+		long lrows = ec.getScalarInput(rows).getLongValue();
+		long lcols = ec.getScalarInput(cols).getLongValue();
+		checkValidDimensions(lrows, lcols);
+
+		String[] instStrings = new String[rowsPerWorker.length];
+		Arrays.fill(instStrings, 0, rowsPerWorker.length, instString);
+
+		InetAddress addr = null;
+		try {
+			addr = InetAddress.getLocalHost();
+		}
+		catch(UnknownHostException e) {
+			e.printStackTrace();
+		}
+		String host = addr.getHostName();

Review comment:
       With only here is another argument for why we need to specify a list of federated addresses, in the arguments for the instruction since we need to specify other hosts than localhost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@systemds.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org