You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/17 11:31:41 UTC
[systemds] branch master updated: [SYSTEMDS-2616] Parallel detect
schema
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new cacee11 [SYSTEMDS-2616] Parallel detect schema
cacee11 is described below
commit cacee1131458c97927a23242e9760c038c0e5ddb
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 17 13:30:55 2020 +0200
[SYSTEMDS-2616] Parallel detect schema
Changes the implementation of detect schema to run in
parallel across all columns.
Closes #1012
---
.../cp/BinaryFrameFrameCPInstruction.java | 2 +-
.../spark/BinaryFrameFrameSPInstruction.java | 2 +-
.../sysds/runtime/matrix/data/FrameBlock.java | 164 +++++++++++++--------
.../functions/frame/FrameDropInvalidTypeTest.java | 30 +++-
4 files changed, 129 insertions(+), 69 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
index 7968b18..8bc8744 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -39,7 +39,7 @@ public class BinaryFrameFrameCPInstruction extends BinaryCPInstruction
if(getOpcode().equals("dropInvalidType")) {
// Perform computation using input frames, and produce the result frame
- FrameBlock retBlock = inBlock1.dropInvalid(inBlock2);
+ FrameBlock retBlock = inBlock1.dropInvalidType(inBlock2);
// Attach result frame with FrameBlock associated with output_name
ec.setFrameOutput(output.getName(), retBlock);
}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
index 82ca398..6966178 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -95,7 +95,7 @@ public class BinaryFrameFrameSPInstruction extends BinarySPInstruction {
@Override
public FrameBlock call(FrameBlock arg0) throws Exception {
- return arg0.dropInvalid(schema_frame);
+ return arg0.dropInvalidType(schema_frame);
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 9605380..8a094d0 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -32,10 +32,17 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.sysds.api.DMLException;
import org.apache.sysds.common.Types.ValueType;
@@ -46,12 +53,14 @@ import org.apache.sysds.runtime.instructions.cp.*;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
@SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
-public class FrameBlock implements CacheBlock, Externalizable
-{
+public class FrameBlock implements CacheBlock, Externalizable {
+ private static final Log LOG = LogFactory.getLog(FrameBlock.class.getName());
+
private static final long serialVersionUID = -3993450030207130665L;
public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block
@@ -1869,9 +1878,21 @@ public class FrameBlock implements CacheBlock, Externalizable
val = val.trim().toLowerCase().replaceAll("\"", "");
if (val.matches("(true|false|t|f|0|1)"))
return ValueType.BOOLEAN;
- else if (val.matches("[-+]?\\d+"))
- return ValueType.INT64;
- else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)") || val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
+ else if (val.matches("[-+]?\\d+")){
+ long maxValue = Long.parseLong(val);
+ if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
+ return ValueType.INT32;
+ else
+ return ValueType.INT64;
+ }
+ else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)")){
+ double maxValue = Double.parseDouble(val);
+ if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
+ return ValueType.FP32;
+ else
+ return ValueType.FP64;
+ }
+ else if (val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
return ValueType.FP64;
else return ValueType.STRING;
}
@@ -1880,48 +1901,25 @@ public class FrameBlock implements CacheBlock, Externalizable
int rows = this.getNumRows();
int cols = this.getNumColumns();
String[] schemaInfo = new String[cols];
- int sample = (int)Math.min(Math.max(sampleFraction*rows, 1024), rows);
+ int sample = (int)Math.min(Math.max(sampleFraction*rows, 256), rows);
+
+ ExecutorService pool = CommonThreadPool.get(cols);
+ ArrayList<DetectValueTypeTask> tasks = new ArrayList<>();
for (int i = 0; i < cols; i++) {
- ValueType state = ValueType.UNKNOWN;
- Array obj = this.getColumn(i);
- for (int j = 0; j < sample; j++)
- {
- String dataValue = null;
- //read a not null sample value
- while (dataValue == null) {
- int randomIndex = ThreadLocalRandom.current().nextInt(0, rows - 1);
- dataValue = ((obj.get(randomIndex) != null)?obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
- }
+ FrameBlock.Array obj = this.getColumn(i);
+ tasks.add(new DetectValueTypeTask(obj,rows, sample));
+ }
- if (isType(dataValue) == ValueType.STRING) {
- state = ValueType.STRING;
- break;
- }
- else if (isType(dataValue) == ValueType.FP64) {
- if (dataValue.equals("infinity") || dataValue.equals("-infinity") || dataValue.equals("nan")) {
- state = ValueType.FP64;
- }
- else {
- double maxValue = Double.parseDouble(dataValue);
- if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
- state = (state == ValueType.FP64 ? state : ValueType.FP32);
- else
- state = ValueType.FP64;
- }
- }
- else if (isType(dataValue) == ValueType.INT64) {
- long maxValue = Long.parseLong(dataValue);
- if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
- state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
- else
- state = ((state == ValueType.FP64 || state == ValueType.FP32) ? state : ValueType.INT64);
- }
- else if (isType(dataValue) == ValueType.BOOLEAN)
- state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32)).contains(state)) ? state : ValueType.BOOLEAN);
- else if (isType(dataValue) == ValueType.STRING)
- state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32, ValueType.BOOLEAN)).contains(state)) ? state : ValueType.STRING);
+ List<Future<String>> ret;
+
+ try {
+ ret = pool.invokeAll(tasks);
+ pool.shutdown();
+ for(int i = 0; i < cols; i++){
+ schemaInfo[i] = ret.get(i).get();
}
- schemaInfo[i] = state.name();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new DMLRuntimeException("Exception Interupted or Exception thrown in Detect Schema", e);
}
//create output block one row representing the schema as strings
@@ -1930,12 +1928,57 @@ public class FrameBlock implements CacheBlock, Externalizable
return fb;
}
+ private static class DetectValueTypeTask implements Callable<String>
+ {
+ private final Array _obj;
+ private final int _rows;
+ private final int _sampleSize;
+
+
+ protected DetectValueTypeTask(Array obj, int rows, int sampleSize ) {
+ _obj = obj;
+ _rows = rows;
+ _sampleSize = sampleSize;
+ }
+
+ @Override
+ public String call() {
+ ValueType state = ValueType.UNKNOWN;
+ for (int j = 0; j < _sampleSize; j++) {
+ int randomIndex = ThreadLocalRandom.current().nextInt(0, _rows - 1);
+ String dataValue = ((_obj.get(randomIndex) != null)?_obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
+ if(dataValue != null){
+ ValueType current = isType(dataValue);
+ if (current == ValueType.STRING) {
+ state = ValueType.STRING;
+ break;
+ }
+ else if (current== ValueType.FP64) {
+ state = ValueType.FP64;
+ }
+ else if (current== ValueType.FP32) {
+ state = (state == ValueType.FP64 ? state : ValueType.FP32);
+ }
+ else if (current == ValueType.INT64) {
+ state = ((state == ValueType.FP64 || state == ValueType.FP32) ? state : ValueType.INT64);
+ }
+ else if (current == ValueType.INT32) {
+ state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
+ }
+ else if (current == ValueType.BOOLEAN)
+ state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64 || state == ValueType.INT32) ? state : ValueType.BOOLEAN);
+ }
+ }
+ return state.name();
+ }
+ }
+
/**
* Drop the cell value which does not confirms to the data type of its column
* @param schema of the frame
* @return original frame where invalid values are replaced with null
*/
- public FrameBlock dropInvalid(FrameBlock schema) {
+ public FrameBlock dropInvalidType(FrameBlock schema) {
//sanity checks
if(this.getNumColumns() != schema.getNumColumns())
throw new DMLException("mismatch in number of columns in frame and its schema ");
@@ -1943,6 +1986,19 @@ public class FrameBlock implements CacheBlock, Externalizable
String[] schemaString = schema.getStringRowIterator().next(); // extract the schema in String array
for (int i = 0; i < this.getNumColumns(); i++) {
Array obj = this.getColumn(i);
+ String schemaCol = schemaString[i];
+ String type;
+ if(schemaCol.contains("FP")){
+ type = "FP";
+ } else if (schemaCol.contains("INT")){
+ type = "INT";
+ } else if (schemaCol.contains("STRING")){
+ // In case of String columns, don't do any verification or replacements.
+ break;
+ } else{
+ type = schemaCol;
+ }
+
for (int j = 0; j < this.getNumRows(); j++)
{
if(obj.get(j) == null)
@@ -1950,21 +2006,11 @@ public class FrameBlock implements CacheBlock, Externalizable
String dataValue = obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
ValueType dataType = isType(dataValue);
- if (dataType== ValueType.FP64 && schemaString[i].trim().equals("FP32")) {
- double maxValue = Double.parseDouble(dataValue);
- if ((maxValue < (-Float.MAX_VALUE)) || (maxValue > Float.MAX_VALUE))
- this.set(j,i,null);
- }
- else if (dataType== ValueType.INT64 && schemaString[i].trim().equals("INT32")) {
- long maxValue = Long.parseLong(dataValue);
- if ((maxValue < Integer.MIN_VALUE) || (maxValue > Integer.MAX_VALUE))
- this.set(j,i,null);
- }
- else if(dataType == ValueType.BOOLEAN && schemaString[i].trim().equals("INT32")
- && ((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
- continue;
- else if (!dataType.toString().equals(schemaString[i].trim()))
+ if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type == "INT")){
+ LOG.warn("Datatype detected: " + dataType + " where expected: " + schemaString[i] + " index: " + i + "," +j);
+
this.set(j,i,null);
+ }
}
}
return this;
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
index d70b255..841c61e 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
@@ -68,45 +68,59 @@ public class FrameDropInvalidTypeTest extends AutomatedTestBase
@Test
public void testDoubleinStringCP() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.CP);
+ // This test now verifies floating points are okay in string columns
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.CP, true);
}
@Test
public void testDoubleinStringSpark() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.SPARK);
+ // This test now verifies floating points are okay in string columns
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.SPARK, true);
}
@Test
public void testStringInDouble() {
+ // This test now verifies strings are removed in float columns
runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 2, LopProperties.ExecType.CP);
}
@Test
public void testStringInDoubleSpark() {
+ // This test now verifies strings are removed in float columns
runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 2, LopProperties.ExecType.SPARK);
}
@Test
public void testDoubleInFloat() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.CP);
+ // This test now verifies that changing from FP64 to FP32 is okay.
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.CP,true);
}
@Test
public void testDoubleInFloatSpark() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.SPARK);
+ // This test now verifies that changing from FP64 to FP32 is okay.
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.SPARK, true);
}
@Test
public void testLongInInt() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.CP);
+ // This test now verifies that changing from INT32 to INT64 is okay.
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.CP, true);
}
@Test
public void testLongInIntSpark() {
- runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.SPARK);
+ // This test now verifies that changing from INT32 to INT64 is okay.
+ runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.SPARK, true);
}
+
+ private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
+ int badValues, int test, LopProperties.ExecType et){
+ runIsCorrectTest(schema, rows, cols, badValues, test, et, false);
+ }
+
private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
- int badValues, int test, LopProperties.ExecType et)
+ int badValues, int test, LopProperties.ExecType et, boolean ignore)
{
Types.ExecMode platformOld = setExecMode(et);
boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
@@ -182,7 +196,7 @@ public class FrameDropInvalidTypeTest extends AutomatedTestBase
int nullNum = Math.toIntExact(data.stream().filter(s -> s == null).count());
//verify output schema
- Assert.assertEquals("Wrong result: " + nullNum + ".", badValues, nullNum);
+ Assert.assertEquals("Wrong result: " + nullNum + ".", ignore ? 0 : badValues, nullNum);
}
catch (Exception ex) {
throw new RuntimeException(ex);