You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2020/08/16 16:37:07 UTC

[systemds] branch master updated: [SYSTEMDS-2581] Serialization, deserialization of dedup DAGs

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e13392  [SYSTEMDS-2581] Serialization, deserialization of dedup DAGs
4e13392 is described below

commit 4e1339222c1c69c0d0f46f1d7bdf651da4b3a325
Author: arnabp <ar...@tugraz.at>
AuthorDate: Mon Aug 10 16:37:14 2020 +0200

    [SYSTEMDS-2581] Serialization, deserialization of dedup DAGs
    
    This patch includes:
     - a new approach to hook the dedup patches to the main trace.
       Now the dedup opcode also encodes details needed for mapping
       the item back to the correct dedup patch after deserialization.
     - Two methods to serialize and deserialize the compressed dedup
       DAGs. Serialization logic merges all the patches in a single
       string and maintains headers with all the information.
       Deserialization logic parses the headers to map the DAGs to
       the right dedup entries from the main trace.
---
 .../runtime/controlprogram/ForProgramBlock.java    |  2 +-
 .../runtime/controlprogram/WhileProgramBlock.java  |  2 +-
 .../org/apache/sysds/runtime/lineage/Lineage.java  | 12 ++++++--
 .../sysds/runtime/lineage/LineageDedupBlock.java   |  4 +++
 .../sysds/runtime/lineage/LineageDedupUtils.java   | 11 +++++--
 .../apache/sysds/runtime/lineage/LineageItem.java  |  4 +++
 .../sysds/runtime/lineage/LineageItemUtils.java    | 35 +++++++++++++++++++++-
 .../apache/sysds/runtime/lineage/LineageMap.java   | 23 ++++++++++----
 .../sysds/runtime/lineage/LineageParser.java       | 32 +++++++++++++++++++-
 src/main/java/org/apache/sysds/utils/Explain.java  |  3 ++
 .../functions/lineage/LineageTraceDedup10.dml      |  2 +-
 11 files changed, 115 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/ForProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/ForProgramBlock.java
index c8b86bb..4f6f552 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/ForProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/ForProgramBlock.java
@@ -143,7 +143,7 @@ public class ForProgramBlock extends ProgramBlock
 				if (DMLScript.LINEAGE_DEDUP) {
 					LineageDedupUtils.replaceLineage(ec);
 					// hook the dedup map to the main lineage trace
-					ec.getLineage().traceCurrentDedupPath();
+					ec.getLineage().traceCurrentDedupPath(this, ec);
 				}
 			}
 			
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/WhileProgramBlock.java b/src/main/java/org/apache/sysds/runtime/controlprogram/WhileProgramBlock.java
index a6144df..234f58f 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/WhileProgramBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/WhileProgramBlock.java
@@ -120,7 +120,7 @@ public class WhileProgramBlock extends ProgramBlock
 				if (DMLScript.LINEAGE_DEDUP) {
 					LineageDedupUtils.replaceLineage(ec);
 					// hook the dedup map to the main lineage trace
-					ec.getLineage().traceCurrentDedupPath();
+					ec.getLineage().traceCurrentDedupPath(this, ec);
 				}
 			}
 			
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/Lineage.java b/src/main/java/org/apache/sysds/runtime/lineage/Lineage.java
index f1a95c0..8ee2e2b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/Lineage.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/Lineage.java
@@ -28,6 +28,7 @@ import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -56,14 +57,15 @@ public class Lineage {
 			_map.trace(inst, ec);
 	}
 	
-	public void traceCurrentDedupPath() {
+	public void traceCurrentDedupPath(ProgramBlock pb, ExecutionContext ec) {
 		if( _activeDedupBlock != null ) {
+			ArrayList<String> inputnames = pb.getStatementBlock().getInputstoSB();
+			LineageItem[] liinputs = LineageItemUtils.getLineageItemInputstoSB(inputnames, ec);
 			long lpath = _activeDedupBlock.getPath();
 			LineageDedupUtils.setDedupMap(_activeDedupBlock, lpath);
 			LineageMap lm = _activeDedupBlock.getMap(lpath);
 			if (lm != null)
-				_map.processDedupItem(lm, lpath);
-			
+				_map.processDedupItem(lm, lpath, liinputs, pb.getStatementBlock().getName());
 		}
 	}
 	
@@ -91,6 +93,10 @@ public class Lineage {
 		return _map;
 	}
 	
+	public Map<ProgramBlock, LineageDedupBlock> getDedupBlocks() {
+		return _dedupBlocks;
+	}
+	
 	public void set(String varName, LineageItem li) {
 		_map.set(varName, li);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupBlock.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupBlock.java
index 7b2194e..074c25c 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupBlock.java
@@ -51,6 +51,10 @@ public class LineageDedupBlock {
 		return _distinctPaths.containsKey(path) ? _distinctPaths.get(path) : null;
 	}
 	
+	public Map<Long, LineageMap> getPathMaps() {
+		return _distinctPaths;
+	}
+	
 	public void setMap(Long takenPath, LineageMap tracedMap) {
 		_distinctPaths.put(takenPath, new LineageMap(tracedMap));
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
index 41694b2..6acc248 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageDedupUtils.java
@@ -100,8 +100,15 @@ public class LineageDedupUtils {
 		ArrayList<String> inputnames = fpb.getStatementBlock().getInputstoSB();
 		LineageItem[] liinputs = LineageItemUtils.getLineageItemInputstoSB(inputnames, ec);
 		// TODO: find the inputs from the ProgramBlock instead of StatementBlock
-		for (int i=0; i<liinputs.length; i++)
-			_tmpLineage.set(inputnames.get(i), liinputs[i]);
+		String ph = LineageItemUtils.LPLACEHOLDER;
+		for (int i=0; i<liinputs.length; i++) {
+			// Wrap the inputs with order-preserving placeholders.
+			// An alternative way would be to replace the non-literal leaves with 
+			// placeholders after each iteration, but that requires a full DAG
+			// traversal after each iteration.
+			LineageItem phInput = new LineageItem(ph+String.valueOf(i), new LineageItem[] {liinputs[i]});
+			_tmpLineage.set(inputnames.get(i), phInput);
+		}
 		// also copy the dedupblock to trace the taken path (bitset)
 		_tmpLineage.setDedupBlock(ldb);
 		// attach the lineage object to the execution context
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
index 2a16baa..2a9891a 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItem.java
@@ -71,6 +71,10 @@ public class LineageItem {
 	public LineageItem(long id, LineageItem li) {
 		this(id, li._data, li._opcode, li._inputs);
 	}
+
+	public LineageItem(long id, String data, String opcode) {
+		this(id, data, opcode, null);
+	}
 	
 	public LineageItem(long id, String data, String opcode, LineageItem[] inputs) {
 		_id = id;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index c7857a8..9a96828 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -65,6 +65,7 @@ import org.apache.sysds.parser.Statement;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.BasicProgramBlock;
 import org.apache.sysds.runtime.controlprogram.Program;
+import org.apache.sysds.runtime.controlprogram.ProgramBlock;
 import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory;
@@ -80,6 +81,7 @@ import org.apache.sysds.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysds.runtime.instructions.spark.SPInstruction.SPType;
 import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType;
 import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.utils.Explain;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -96,7 +98,8 @@ import java.util.stream.Collectors;
 public class LineageItemUtils {
 	
 	private static final String LVARPREFIX = "lvar";
-	private static final String LPLACEHOLDER = "IN#";
+	public static final String LPLACEHOLDER = "IN#";
+	public static final String DEDUP_DELIM = "_";
 	
 	public static LineageItemType getType(String str) {
 		if (str.length() == 1) {
@@ -651,6 +654,36 @@ public class LineageItemUtils {
 		item.setVisited();
 	}
 	
+	public static String mergeExplainDedupBlocks(ExecutionContext ec) {
+		Map<ProgramBlock, LineageDedupBlock> dedupBlocks = ec.getLineage().getDedupBlocks();
+		StringBuilder sb = new StringBuilder();
+		// Gather all the DAG roots of all the paths in all the loops.
+		for (Map.Entry<ProgramBlock, LineageDedupBlock> dblock : dedupBlocks.entrySet()) {
+			if (dblock.getValue() != null) {
+				String forKey = dblock.getKey().getStatementBlock().getName();
+				LineageDedupBlock dedup = dblock.getValue();
+				for (Map.Entry<Long, LineageMap> patch : dedup.getPathMaps().entrySet()) {
+					for (Map.Entry<String, LineageItem> root : patch.getValue().getTraces().entrySet()) {
+						// Encode all the information in the headers that're
+						// needed by the deserialization logic.
+						sb.append("patch");
+						sb.append(DEDUP_DELIM);
+						sb.append(root.getKey());
+						sb.append(DEDUP_DELIM);
+						sb.append(forKey);
+						sb.append(DEDUP_DELIM);
+						sb.append(patch.getKey());
+						sb.append("\n");
+						sb.append(Explain.explain(root.getValue()));
+						sb.append("\n");
+						
+					}
+				}
+			}
+		}
+		return sb.toString();
+	}
+	
 	public static LineageItem replace(LineageItem root, LineageItem liOld, LineageItem liNew) {
 		if( liNew == null )
 			throw new DMLRuntimeException("Invalid null lineage item for "+liOld.getId());
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
index 51b3d23..fdcadff 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
@@ -81,6 +81,18 @@ public class LineageMap {
 			}
 		}
 	}
+
+	public void processDedupItem(LineageMap lm, Long path, LineageItem[] liinputs, String name) {
+		String delim = LineageItemUtils.DEDUP_DELIM;
+		for (Map.Entry<String, LineageItem> entry : lm._traces.entrySet()) {
+			// Encode everything in the opcode needed by the deserialization logic
+			// to map this lineage item to the right patch.
+			String opcode = LineageItem.dedupItemOpcode + delim + entry.getKey()
+				+ delim + name + delim + path.toString();
+			LineageItem li = new LineageItem(opcode, liinputs);
+			addLineageItem(Pair.of(entry.getKey(), li));
+		}
+	}
 	
 	public LineageItem getOrCreate(CPOperand variable) {
 		if (variable == null)
@@ -237,11 +249,12 @@ public class LineageMap {
 		String fName = ec.getScalarInput(input2.getName(), Types.ValueType.STRING, input2.isLiteral()).getStringValue();
 		
 		if (DMLScript.LINEAGE_DEDUP) {
-			LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage.dedup");
-			//li = LineageItemUtils.rDecompress(li);
-			// TODO:gracefully serialize the dedup maps without decompressing
+			// gracefully serialize the dedup maps without decompressing
+			LineageItemUtils.writeTraceToHDFS(LineageItemUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup");
+			// sample code to deserialize the dedup patches
+			//String allDedup = LineageItemUtils.mergeExplainDedupBlocks(ec);
+			//LineageItem tmp = LineageParser.parseLineageTraceDedup(allDedup);
 		}
-		else
-			LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage");
+		LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage");
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
index 92cfa2c..c70cfdd 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
@@ -89,11 +89,18 @@ public class LineageParser
 	}
 	
 	private static LineageItem parseLineageInstruction(Long id, String str, Map<Long, LineageItem> map, String name) {
-		ArrayList<LineageItem> inputs = new ArrayList<>();
 		String[] tokens = str.split(" ");
 		if (tokens.length < 2)
 			throw new ParseException("Invalid length ot lineage item "+tokens.length+".");
 		String opcode = tokens[0];
+
+		if (opcode.startsWith(LineageItemUtils.LPLACEHOLDER)) {
+			// Convert this to a leaf node (creation type)
+			String data = opcode;
+			return new LineageItem(id, data, "Create"+opcode);
+		}
+
+		ArrayList<LineageItem> inputs = new ArrayList<>();
 		for( int i=1; i<tokens.length; i++ ) {
 			String token = tokens[i];
 			if (token.startsWith("(") && token.endsWith(")")) {
@@ -104,4 +111,27 @@ public class LineageParser
 		}
 		return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0]));
 	}
+	
+	public static LineageItem parseLineageTraceDedup(String str) {
+		LineageItem li = null;
+		Map<Long, Map<String, LineageItem>> patchLiMap = new HashMap<>();
+		str.replaceAll("\r\n", "\n");
+		String[] allPatches = str.split("\n\n");
+		for (String patch : allPatches) {
+			String[] headBody = patch.split("\r\n|\r|\n", 2);
+			// Parse the header
+			String[] parts = headBody[0].split(LineageItemUtils.DEDUP_DELIM);
+			// e.g. patch_R_SB15_1
+			// Deserialize the patch
+			LineageItem patchLi = parseLineageTrace(headBody[1]);
+			Long pathId = Long.parseLong(parts[3]);
+			// Map the pathID and the DAG root name to the deserialized DAG.
+			if (!patchLiMap.containsKey(pathId)) {
+				patchLiMap.put(pathId, new HashMap<>());
+			}
+			patchLiMap.get(pathId).put(parts[1], patchLi);
+			// TODO: handle multiple loops
+		}
+		return li;
+	}
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sysds/utils/Explain.java b/src/main/java/org/apache/sysds/utils/Explain.java
index 484f9b4..c232595 100644
--- a/src/main/java/org/apache/sysds/utils/Explain.java
+++ b/src/main/java/org/apache/sysds/utils/Explain.java
@@ -67,6 +67,7 @@ import org.apache.sysds.runtime.instructions.spark.CheckpointSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.ReblockSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.SPInstruction;
 import org.apache.sysds.runtime.lineage.LineageItem;
+import org.apache.sysds.runtime.lineage.LineageItemUtils;
 
 public class Explain
 {
@@ -621,6 +622,8 @@ public class Explain
 			}
 			//check ascent condition - append item
 			else if( tmpItem.getInputs() == null 
+				|| tmpItem.getOpcode().startsWith(LineageItemUtils.LPLACEHOLDER)
+				// don't trace beyond if a placeholder is found
 				|| tmpItem.getInputs().length <= tmpPos.intValue() ) {
 				sb.append(createOffset(level));
 				sb.append(tmpItem.toString());
diff --git a/src/test/scripts/functions/lineage/LineageTraceDedup10.dml b/src/test/scripts/functions/lineage/LineageTraceDedup10.dml
index 9f3205b..0833a52 100644
--- a/src/test/scripts/functions/lineage/LineageTraceDedup10.dml
+++ b/src/test/scripts/functions/lineage/LineageTraceDedup10.dml
@@ -26,7 +26,7 @@ R = X;
 for (i in 1:4) {
   R = R + 1/2;
   if (i %% 2 == 0)
-    R = R * 3;
+    R = R * 3*i;
   R = R - 5;
 }