You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/08/17 11:31:41 UTC

[systemds] branch master updated: [SYSTEMDS-2616] Parallel detect schema

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new cacee11  [SYSTEMDS-2616] Parallel detect schema
cacee11 is described below

commit cacee1131458c97927a23242e9760c038c0e5ddb
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 17 13:30:55 2020 +0200

    [SYSTEMDS-2616] Parallel detect schema
    
    Changes the implementation of detect schema to run in
    parallel across all columns.
    
    Closes #1012
---
 .../cp/BinaryFrameFrameCPInstruction.java          |   2 +-
 .../spark/BinaryFrameFrameSPInstruction.java       |   2 +-
 .../sysds/runtime/matrix/data/FrameBlock.java      | 164 +++++++++++++--------
 .../functions/frame/FrameDropInvalidTypeTest.java  |  30 +++-
 4 files changed, 129 insertions(+), 69 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
index 7968b18..8bc8744 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryFrameFrameCPInstruction.java
@@ -39,7 +39,7 @@ public class BinaryFrameFrameCPInstruction extends BinaryCPInstruction
 		
 		if(getOpcode().equals("dropInvalidType")) {
 			// Perform computation using input frames, and produce the result frame
-			FrameBlock retBlock = inBlock1.dropInvalid(inBlock2);
+			FrameBlock retBlock = inBlock1.dropInvalidType(inBlock2);
 			// Attach result frame with FrameBlock associated with output_name
 			ec.setFrameOutput(output.getName(), retBlock);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
index 82ca398..6966178 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java
@@ -95,7 +95,7 @@ public class BinaryFrameFrameSPInstruction extends BinarySPInstruction {
 
 		@Override
 		public FrameBlock call(FrameBlock arg0) throws Exception {
-			return arg0.dropInvalid(schema_frame);
+			return arg0.dropInvalidType(schema_frame);
 		}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
index 9605380..8a094d0 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/FrameBlock.java
@@ -32,10 +32,17 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.sysds.api.DMLException;
 import org.apache.sysds.common.Types.ValueType;
@@ -46,12 +53,14 @@ import org.apache.sysds.runtime.instructions.cp.*;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.transform.encode.EncoderRecode;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
 @SuppressWarnings({"rawtypes","unchecked"}) //allow generic native arrays
-public class FrameBlock implements CacheBlock, Externalizable  
-{
+public class FrameBlock implements CacheBlock, Externalizable  {
+	private static final Log LOG = LogFactory.getLog(FrameBlock.class.getName());
+	
 	private static final long serialVersionUID = -3993450030207130665L;
 	
 	public static final int BUFFER_SIZE = 1 * 1000 * 1000; //1M elements, size of default matrix block 
@@ -1869,9 +1878,21 @@ public class FrameBlock implements CacheBlock, Externalizable
 		val = val.trim().toLowerCase().replaceAll("\"",  "");
 		if (val.matches("(true|false|t|f|0|1)"))
 			return ValueType.BOOLEAN;
-		else if (val.matches("[-+]?\\d+"))
-			return ValueType.INT64;
-		else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)") || val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
+		else if (val.matches("[-+]?\\d+")){
+			long maxValue = Long.parseLong(val);
+			if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
+				return ValueType.INT32;
+			else
+				return ValueType.INT64;
+		}
+		else if (val.matches("[-+]?[0-9]+\\.?[0-9]*([e]?[-+]?[0-9]+)")){
+			double maxValue = Double.parseDouble(val);
+			if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
+				return ValueType.FP32;
+			else
+				return ValueType.FP64;
+		}
+		else if (val.equals("infinity") || val.equals("-infinity") || val.equals("nan"))
 			return ValueType.FP64;
 		else return ValueType.STRING;
 	}
@@ -1880,48 +1901,25 @@ public class FrameBlock implements CacheBlock, Externalizable
 		int rows = this.getNumRows();
 		int cols = this.getNumColumns();
 		String[] schemaInfo = new String[cols];
-		int sample = (int)Math.min(Math.max(sampleFraction*rows, 1024), rows);
+		int sample = (int)Math.min(Math.max(sampleFraction*rows, 256), rows);
+
+		ExecutorService pool = CommonThreadPool.get(cols);
+		ArrayList<DetectValueTypeTask> tasks = new ArrayList<>();
 		for (int i = 0; i < cols; i++) {
-			ValueType state = ValueType.UNKNOWN;
-			Array obj = this.getColumn(i);
-			for (int j = 0; j < sample; j++)
-			{
-				String dataValue = null;
-				//read a not null sample value
-				while (dataValue == null) {
-					int randomIndex = ThreadLocalRandom.current().nextInt(0, rows - 1);
-					dataValue = ((obj.get(randomIndex) != null)?obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
-				}
+			FrameBlock.Array obj = this.getColumn(i);
+			tasks.add(new DetectValueTypeTask(obj,rows, sample));
+		}
 
-				if (isType(dataValue) == ValueType.STRING) {
-					state = ValueType.STRING;
-					break;
-				}
-				else if (isType(dataValue) == ValueType.FP64) {
-					if (dataValue.equals("infinity") || dataValue.equals("-infinity") || dataValue.equals("nan")) {
-						state = ValueType.FP64;
-					}
-					else {
-						double maxValue = Double.parseDouble(dataValue);
-						if ((maxValue >= (-Float.MAX_VALUE)) && (maxValue <= Float.MAX_VALUE))
-							state = (state == ValueType.FP64 ? state : ValueType.FP32);
-						else
-							state = ValueType.FP64;
-					}
-				}
-				else if (isType(dataValue) == ValueType.INT64) {
-					long maxValue = Long.parseLong(dataValue);
-					if ((maxValue >= Integer.MIN_VALUE) && (maxValue <= Integer.MAX_VALUE))
-						state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
-					else
-						state = ((state == ValueType.FP64  || state == ValueType.FP32) ? state : ValueType.INT64);
-				}
-				else if (isType(dataValue) == ValueType.BOOLEAN)
-					state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32)).contains(state)) ? state : ValueType.BOOLEAN);
-				else if (isType(dataValue) == ValueType.STRING)
-					state = ((new ArrayList<>(Arrays.asList(ValueType.FP64, ValueType.FP32, ValueType.INT64, ValueType.INT32, ValueType.BOOLEAN)).contains(state)) ? state : ValueType.STRING);
+		List<Future<String>> ret;
+
+		try {
+			ret = pool.invokeAll(tasks);
+			pool.shutdown();
+			for(int i = 0; i < cols; i++){
+				schemaInfo[i] = ret.get(i).get();
 			}
-			schemaInfo[i] = state.name();
+		} catch (ExecutionException | InterruptedException e) {
+			throw new DMLRuntimeException("Exception Interupted or Exception thrown in Detect Schema", e);
 		}
 
 		//create output block one row representing the schema as strings
@@ -1930,12 +1928,57 @@ public class FrameBlock implements CacheBlock, Externalizable
 		return fb;
 	}
 
+	private static class DetectValueTypeTask implements Callable<String>
+	{
+		private final Array _obj;
+		private final int _rows;
+		private final int _sampleSize;
+
+
+		protected DetectValueTypeTask(Array obj, int rows, int sampleSize ) {
+			_obj = obj;
+			_rows = rows;
+			_sampleSize = sampleSize;
+		}
+
+		@Override
+		public String call() {
+			ValueType state = ValueType.UNKNOWN;
+			for (int j = 0; j < _sampleSize; j++) {
+				int randomIndex = ThreadLocalRandom.current().nextInt(0, _rows - 1);
+				String dataValue = ((_obj.get(randomIndex) != null)?_obj.get(randomIndex).toString().trim().replace("\"", "").toLowerCase():null);
+				if(dataValue != null){
+					ValueType current = isType(dataValue);
+					if (current == ValueType.STRING) {
+						state = ValueType.STRING;
+						break;
+					}
+					else if (current== ValueType.FP64) {
+						state = ValueType.FP64;
+					}
+					else if (current== ValueType.FP32) {
+						state = (state == ValueType.FP64 ? state : ValueType.FP32);
+					}
+					else if (current == ValueType.INT64) {
+						state = ((state == ValueType.FP64  || state == ValueType.FP32) ? state : ValueType.INT64);
+					}
+					else if (current == ValueType.INT32) {
+						state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64) ? state : ValueType.INT32);
+					}
+					else if (current == ValueType.BOOLEAN)
+						state = ((state == ValueType.FP64 || state == ValueType.FP32 || state == ValueType.INT64 || state == ValueType.INT32) ? state : ValueType.BOOLEAN);
+				}
+			}
+			return state.name();
+		}
+	}
+
 	/**
 	 * Drop the cell value which does not confirms to the data type of its column
 	 * @param schema of the frame
 	 * @return original frame where invalid values are replaced with null
 	 */
-	public FrameBlock dropInvalid(FrameBlock schema) {
+	public FrameBlock dropInvalidType(FrameBlock schema) {
 		//sanity checks
 		if(this.getNumColumns() != schema.getNumColumns())
 			throw new DMLException("mismatch in number of columns in frame and its schema ");
@@ -1943,6 +1986,19 @@ public class FrameBlock implements CacheBlock, Externalizable
 		String[] schemaString = schema.getStringRowIterator().next(); // extract the schema in String array
 		for (int i = 0; i < this.getNumColumns(); i++) {
 			Array obj = this.getColumn(i);
+			String schemaCol = schemaString[i];
+			String type;
+			if(schemaCol.contains("FP")){
+				type = "FP";
+			} else if (schemaCol.contains("INT")){
+				type = "INT";
+			} else if (schemaCol.contains("STRING")){
+				// In case of String columns, don't do any verification or replacements.
+				break;
+			} else{
+				type = schemaCol;
+			}
+
 			for (int j = 0; j < this.getNumRows(); j++)
 			{
 				if(obj.get(j) == null)
@@ -1950,21 +2006,11 @@ public class FrameBlock implements CacheBlock, Externalizable
 				String dataValue = obj.get(j).toString().trim().replace("\"", "").toLowerCase() ;
 
 				ValueType dataType = isType(dataValue);
-				 if (dataType== ValueType.FP64 && schemaString[i].trim().equals("FP32")) {
-					 double maxValue = Double.parseDouble(dataValue);
-					 if ((maxValue < (-Float.MAX_VALUE)) || (maxValue > Float.MAX_VALUE))
-						 this.set(j,i,null);
-				}
-				else if (dataType== ValueType.INT64 && schemaString[i].trim().equals("INT32")) {
-					 long maxValue = Long.parseLong(dataValue);
-					 if ((maxValue < Integer.MIN_VALUE) || (maxValue > Integer.MAX_VALUE))
-						 this.set(j,i,null);
-				}
-				else if(dataType == ValueType.BOOLEAN && schemaString[i].trim().equals("INT32")
-						 && ((Integer.parseInt(dataValue) == 1 || Integer.parseInt(dataValue) == 0)))
-					continue;
-				else if (!dataType.toString().equals(schemaString[i].trim()))
+				if(!dataType.toString().contains(type) && !(dataType == ValueType.BOOLEAN && type == "INT")){
+					LOG.warn("Datatype detected: " + dataType + " where expected: " + schemaString[i] + " index: " + i + "," +j);
+
 					this.set(j,i,null);
+				}
 			}
 		}
 		return this;
diff --git a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
index d70b255..841c61e 100644
--- a/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/frame/FrameDropInvalidTypeTest.java
@@ -68,45 +68,59 @@ public class FrameDropInvalidTypeTest extends AutomatedTestBase
 
 	@Test
 	public void testDoubleinStringCP() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.CP);
+		// This test now verifies floating points are okay in string columns
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.CP, true);
 	}
 
 	@Test
 	public void testDoubleinStringSpark() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.SPARK);
+		// This test now verifies floating points are okay in string columns
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 1, LopProperties.ExecType.SPARK, true);
 	}
 
 	@Test
 	public void testStringInDouble() {
+		// This test now verifies strings are removed in float columns
 		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 2, LopProperties.ExecType.CP);
 	}
 
 	@Test
 	public void testStringInDoubleSpark() {
+		// This test now verifies strings are removed in float columns
 		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 3, 2, LopProperties.ExecType.SPARK);
 	}
 
 	@Test
 	public void testDoubleInFloat() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.CP);
+		// This test now verifies that changing from FP64 to FP32 is okay.
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.CP,true);
 	}
 
 	@Test
 	public void testDoubleInFloatSpark() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.SPARK);
+		// This test now verifies that changing from FP64 to FP32 is okay.
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 3, LopProperties.ExecType.SPARK, true);
 	}
 
 	@Test
 	public void testLongInInt() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.CP);
+		// This test now verifies that changing from INT32 to INT64 is okay.
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.CP, true);
 	}
 
 	@Test
 	public void testLongInIntSpark() {
-		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.SPARK);
+		// This test now verifies that changing from INT32 to INT64 is okay.
+		runIsCorrectTest(schemaStrings, rows, schemaStrings.length, 5, 4, LopProperties.ExecType.SPARK, true);
 	}
+
+	private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
+								  int badValues, int test, LopProperties.ExecType et){
+		runIsCorrectTest(schema, rows, cols, badValues, test, et, false);
+	}
+
 	private void runIsCorrectTest(ValueType[] schema, int rows, int cols,
-		int badValues, int test, LopProperties.ExecType et)
+		int badValues, int test, LopProperties.ExecType et, boolean ignore)
 	{
 		Types.ExecMode platformOld = setExecMode(et);
 		boolean oldFlag = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
@@ -182,7 +196,7 @@ public class FrameDropInvalidTypeTest extends AutomatedTestBase
 
 			int nullNum = Math.toIntExact(data.stream().filter(s -> s == null).count());
 			//verify output schema
-			Assert.assertEquals("Wrong result: " + nullNum + ".", badValues, nullNum);
+			Assert.assertEquals("Wrong result: " + nullNum + ".", ignore ? 0 : badValues, nullNum);
 		}
 		catch (Exception ex) {
 			throw new RuntimeException(ex);