You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/07/28 19:03:46 UTC

[systemds] branch master updated: [SYSTEMDS-2593] Entity resolution pipelines and primitives.

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new ee77fad  [SYSTEMDS-2593] Entity resolution pipelines and primitives.
ee77fad is described below

commit ee77fadbb569d93f6738bb7b6083568a35cb3790
Author: Samuel Kogler <sa...@gmail.com>
AuthorDate: Tue Jul 28 20:45:46 2020 +0200

    [SYSTEMDS-2593] Entity resolution pipelines and primitives.
    
    Adds new scripts in `scripts/staging/entity-resolution` that demonstrate
    entity clustering and binary entity resolution with SystemDS DML.
    See the README at `scripts/staging/entity-resolution/README.md` for more
    details.
    
    This is a squash of all commits on branch master from the
    skogler/systemml fork.
    
    AMLS project SS2020.
    Closes #993.
    
    Co-authored-by: Markus Reiter-Haas <is...@gmail.com>
---
 .github/workflows/applicationTests.yml             |   2 +-
 dev/Tasks-obsolete.txt                             |   1 +
 docs/index.md                                      |   1 +
 docs/site/entity-resolution.md                     | 137 ++++++++++
 .../entity-resolution/binary-entity-resolution.dml | 102 +++++++
 .../entity-resolution/entity-clustering.dml        | 117 +++++++++
 .../entity-resolution/eval-entity-resolution.dml   |  59 +++++
 .../entity-resolution/primitives/blocking.dml      | 292 +++++++++++++++++++++
 .../entity-resolution/primitives/clustering.dml    | 158 +++++++++++
 .../entity-resolution/primitives/evaluation.dml    | 142 ++++++++++
 .../entity-resolution/primitives/matching.dml      | 108 ++++++++
 .../entity-resolution/primitives/pipeline.dml      | 220 ++++++++++++++++
 .../primitives/postprocessing.dml                  |  80 ++++++
 .../entity-resolution/primitives/preprocessing.dml |  85 ++++++
 .../sysds/hops/recompile/LiteralReplacement.java   |  28 +-
 .../controlprogram/caching/CacheableData.java      |   8 +
 .../controlprogram/caching/MatrixObject.java       |   8 -
 .../applications/EntityResolutionBinaryTest.java   |  74 ++++++
 .../applications/EntityResolutionBlockingTest.java |  81 ++++++
 .../EntityResolutionClusteringTest.java            | 116 ++++++++
 .../EntityResolutionConnectedComponentsTest.java   | 184 +++++++++++++
 .../entity_resolution/binary/expected.csv          |   6 +
 .../entity_resolution/binary/expected.csv.mtd      |   7 +
 .../entity_resolution/binary/input.csv             |   7 +
 .../entity_resolution/binary/input.csv.mtd         |   7 +
 .../entity_resolution/blocking/blocking_naive.dml  |  31 +--
 .../entity_resolution/clustering/expected.csv      |   6 +
 .../entity_resolution/clustering/expected.csv.mtd  |   7 +
 .../entity_resolution/clustering/input.csv         |   7 +
 .../entity_resolution/clustering/input.csv.mtd     |   7 +
 .../cluster_by_connected_components.dml            |  31 +--
 31 files changed, 2040 insertions(+), 79 deletions(-)

diff --git a/.github/workflows/applicationTests.yml b/.github/workflows/applicationTests.yml
index 652b31a..5b36aab 100644
--- a/.github/workflows/applicationTests.yml
+++ b/.github/workflows/applicationTests.yml
@@ -35,7 +35,7 @@ jobs:
     strategy:
       fail-fast: false
       matrix:
-        tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
+        tests: [A,B,C,E,G,H,I,L,M,N,O,P,S,U,W]
         os: [ubuntu-latest]
     name:  Ap Test ${{ matrix.tests }} 
     steps:
diff --git a/dev/Tasks-obsolete.txt b/dev/Tasks-obsolete.txt
index 561ef48..17dd708 100644
--- a/dev/Tasks-obsolete.txt
+++ b/dev/Tasks-obsolete.txt
@@ -236,6 +236,7 @@ SYSTEMDS-260 Misc Tools
  * 262 Data augmentation tool for data cleaning                       OK
  * 263 ONNX graph importer (Python API, docs, tests)                  OK
  * 264 ONNX graph exporter
+ * 265 Entity resolution pipelines and primitives                     OK
 
 SYSTEMDS-270 Compressed Matrix Blocks
  * 271 Reintroduce compressed matrix blocks from SystemML             OK
diff --git a/docs/index.md b/docs/index.md
index bb072ab..7b09939 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -36,6 +36,7 @@ Various forms of documentation for SystemDS are available.
 
 - a [DML language reference](./site/dml-language-reference) for an list of operations possible inside SystemDS.
 - [builtin functions](./site/builtins-reference) contains a collection of builtin functions providing an high level abstraction on complex machine learning algorithms.
+- [Entity Resolution](./site/entity-resolution) provides a collection of customizable entity resolution primitives and pipelines.
 - [Run SystemDS](./site/run) contains an Helloworld example along with an environment setup guide.
 - Instructions on python can be found at [Python Documentation](./api/python/index)
 - The [javadoc API](./api/java/index) contains internal documentation of the system source code.
diff --git a/docs/site/entity-resolution.md b/docs/site/entity-resolution.md
new file mode 100644
index 0000000..593d712
--- /dev/null
+++ b/docs/site/entity-resolution.md
@@ -0,0 +1,137 @@
+---
+layout: site
+title: Entity Resolution
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+## Pipeline design and primitives
+
+We provide two example scripts, `entity-clustering.dml` and `binary-entity-resolution.dml`. These handle reading input 
+files and writing output files and call functions provided in `primitives/pipeline.dml`.
+
+The pipeline design is loosely based on the following paper, but does not use advanced features like multi-probe LSH,
+combining embeddings via LSTM or classification via machine learning.
+
+```
+Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+```
+
+### Input files
+
+The provided scripts can read two types of input files. The token file is mandatory since it contains the row identifiers, 
+but the embedding file is optional. The actual use of tokens and/or embeddings can be configured via command line parameters 
+to the scripts.
+
+##### Token files
+
+This file type is a CSV file with 3 columns. The first column is the string or integer row identifier, the second is the 
+string token, and the third is the number of occurences. This simple format is used as a bag-of-words representation.
+
+##### Embedding files
+
+This file type is a CSV matrix file with each row containing arbitrary-dimensional embeddings. The order of row identifiers
+is assumed to be the same as in the token file. This saves some computation and storage time, but could be changed with 
+some modifications to the example scripts.
+
+### Primitives
+
+While the example scripts may be sufficient for many simple use cases, we aim to provide a toolkit of composable functions
+to facilitate more complex tasks. The top-level pipelines are defined as a couple of functions in `primitives/pipeline.dml`.
+The goal is that it should be relatively easy to copy one of these pipelines and swap out the primitive functions used
+to create a custom pipeline.
+
+To convert the input token file into a bag-of-words contingency table representation, we provide the functions
+`convert_frame_tokens_to_matrix_bow` and `convert_frame_tokens_to_matrix_bow_2` in  `primitives/preprocessing.dml`.
+The latter is used to compute a compatible contigency table with matching vocabulary for binary entity resolution. 
+
+We provide naive, constant-size blocking and locality-sensitive hashing (LSH) as functions in `primitives/blocking.dml`.
+
+For entity clustering, we only provide a simple clustering approach which makes all connected components in an adjacency
+matrix fully connected. This function is located in `primitives/clustering.dml`.
+
+To restore an adjacency matrix to a list of pairs, we provide the functions `untable` and `untable_offset` in 
+`primitives/postprocessing.dml`.
+
+Finally, `primitives/evaluation.dml` defines some metrics that can be used to evaluate the performance of the entity
+resolution pipelines. They are used in the script `eval-entity-resolution.dml`. 
+
+## Testing and Examples
+
+There is a test data repository that was used to develop these scripts at 
+[repo](https://github.com/skogler/systemds-amls-project-data). In the examples below, it is assumed that this repo is 
+cloned as `data` in the SystemDS root folder. The data in that repository is sourced from the Uni Leipzig entity resolution 
+[benchmark](https://dbs.uni-leipzig.de/research/projects/object_matching/benchmark_datasets_for_entity_resolution).
+
+### Preprocessing
+
+Since there is no tokenization functionality in SystemDS yet, we provide a Python preprocessing script in the data repository
+that tokenizes the text columns and performs some simple embedding lookup using Glove embeddings.
+
+The tokens are written as CSV files to enable Bag-of-Words representations as well as matrices with combined embeddings. D
+epending on the type of data, one or the other or a combination of both may be better. The SystemDS DML scripts can be 
+called with different parameters to experiment with this.
+
+### Entity Clustering
+
+In this case we detect duplicates within one database. As an example, we use the benchmark dataset Affiliations from Uni Leipzig.
+For this dataset, embeddings do not work well since the data is mostly just names. Therefore, we encode it as Bag-of-Words vectors
+in the example below. This dataset would benefit from more preprocessing, as simply matching words for all the different kinds of
+abbreviations does not work particularly well.
+
+Example command to run on Affiliations dataset:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/entity-clustering.dml -nvargs FX=data/affiliationstrings/affiliationstrings_tokens.csv OUT=data/affiliationstrings/affiliationstrings_res.csv store_mapping=FALSE MX=data/affiliationstrings/affiliationstrings_MX.csv use_embeddings=FALSE XE=data/affiliationstrings/affiliationstrings_embeddings.csv
+```
+Evaluation:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/eval-entity-resolution.dml -nvargs FX=data/affiliationstrings/affiliationstrings_res.csv FY=data/affiliationstrings/affiliationstrings_mapping_fixed.csv
+```
+
+### Binary Entity Resolution
+
+In this case we detect duplicate pairs of rows between two databases. As an example, we use the benchmark dataset DBLP-ACM from Uni Leipzig.
+Embeddings work really well for this dataset, so the results are quite good with an F1 score of 0.89.
+
+Example command to run on DBLP-ACM dataset with embeddings:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/binary-entity-resolution.dml -nvargs FY=data/DBLP-ACM/ACM_tokens.csv FX=data/DBLP-ACM/DBLP2_tokens.csv MX=data/DBLP-ACM_MX.csv OUT=data/DBLP-ACM/DBLP-ACM_res.csv XE=data/DBLP-ACM/DBLP2_embeddings.csv YE=data/DBLP-ACM/ACM_embeddings.csv use_embeddings=TRUE
+```
+Evaluation:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/eval-entity-resolution.dml -nvargs FX=data/DBLP-ACM/DBLP-ACM_res.csv FY=data/DBLP-ACM/DBLP-ACM_perfectMapping.csv
+```
+
+## Future Work
+
+1. Better clustering algorithms.
+    1. Correlation clustering.
+    2. Markov clustering.
+    3. See [this link](https://dbs.uni-leipzig.de/en/publication/title/comparative_evaluation_of_distributed_clustering_schemes_for_multi_source_entity_resolution) for more approaches.
+2. Multi-Probe LSH to improve runtime performance. 
+    1. Probably as a SystemDS built-in to be more efficient.
+3. Classifier-based matching.
+    1. Using an SVM classifier to decide if two tuple are duplicates instead of a threshold for similarity.
+4. Better/built-in tokenization.
+    1. Implement text tokenization as component of SystemDS.
+    2. Offer choice of different preprocessing and tokenization algorithms (e.g. stemming, word-piece tokenization).
+5. Better/built-in embeddings.
+   1. Implement embedding generation as component of SystemDS.
+   2. Use LSTM to compose embeddings.
\ No newline at end of file
diff --git a/scripts/staging/entity-resolution/binary-entity-resolution.dml b/scripts/staging/entity-resolution/binary-entity-resolution.dml
new file mode 100644
index 0000000..a8337e8
--- /dev/null
+++ b/scripts/staging/entity-resolution/binary-entity-resolution.dml
@@ -0,0 +1,102 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT PERFORMS AN ENTITY RESOLUTION PIPELINE FOR BINARY MATCHING ON TWO FILES
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME           TYPE   DEFAULT  MEANING
+# ---------------------------------------------------------------------------------------------
+# FX              String  ---     Location to read the frame of tokens in bow format for the first dataset
+#                                 Each line contains comma separated list of id, token and value
+# FY              String  ---     Location to read the frame of tokens in bow format for the second dataset
+#                                 Each line contains comma separated list of id, token and value
+# OUT             String  ---     Location to save the output of maching pairs
+#                                 Each line contains comma separated ids of one matched pair
+#                                 First column is for the first dataset, while second columns is the id of the second one
+#                                 Third column provides the similarity score
+# threshold       Double  0.9     Threshold to be considered as a match
+# num_hashtables  Int     6       Number of hashtables for LSH blocking.
+# num_hyperplanes Int     4       Number of hyperplanes for LSH blocking.
+# use_tokens      Boolean TRUE    Whether to use the tokens of FX and FY to generate predictions
+# use_embeddings  Boolean FALSE   Whether to use the embeddings of XE and YE to generate predictions
+# XE              String  ---     Location to read the frame of embedding matrix for the first dataset
+#                                 Required if use_embeddings is set to TRUE
+# YE              String  ---     Location to read the frame of embedding matrix for the second dataset
+#                                Required if use_embeddings is set to TRUE
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: frame of maching pairs
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/postprocessing.dml") as post;
+source("./scripts/staging/entity-resolution/primitives/preprocessing.dml") as pre;
+source("./scripts/staging/entity-resolution/primitives/pipeline.dml") as pipe;
+
+# Command Line Arguments
+fileFX = $FX;
+fileFY = $FY;
+fileOUT = $OUT;
+
+threshold = ifdef($threshold, 0.9);
+num_hashtables = ifdef($num_hashtables, 6);
+num_hyperplanes = ifdef($num_hyperplanes, 4);
+
+use_tokens = ifdef($use_tokens, TRUE);
+use_embeddings = ifdef($use_embeddings, FALSE);
+# file XE and YE is only required if using embeddings
+fileXE = ifdef($XE, "");
+fileYE = ifdef($YE, "");
+
+# Read data
+FX = read(fileFX);
+FY = read(fileFY);
+if (use_embeddings) {
+  if (fileXE == "" | fileYE == "") {
+    print("You need to specify file XE and XY when use_embeddings is set to TRUE");
+  } else {
+    X_embeddings = read(fileXE);
+    Y_embeddings = read(fileYE);
+  }
+}
+
+# Convert data
+[X, Y, M_tokens, MX_ids, MY_ids] = pre::convert_frame_tokens_to_matrix_bow_2(FX,FY);
+if (use_tokens & use_embeddings) {
+  X = cbind(X, X_embeddings);
+  Y = cbind(Y, Y_embeddings);
+} else if (use_tokens) {
+  # Nothing to do in this case, since X already contains tokens
+} else if (use_embeddings) {
+  X = X_embeddings;
+  Y = Y_embeddings;
+} else {
+  print("Either use_tokens or use_embeddings needs to be TRUE, using tokens only as default.");
+}
+# Perform matching
+THRES = pipe::binary_entity_resolution_pipeline_lsh(X, Y, num_hashtables, num_hyperplanes, threshold);
+sparse = post::untable(THRES);
+
+# Write results
+X_dec = transformdecode(target=sparse[,1], meta=MX_ids[,1], spec="{recode:[C1]}");
+Y_dec = transformdecode(target=sparse[,2], meta=MY_ids[,1], spec="{recode:[C1]}");
+output = cbind(cbind(X_dec, Y_dec), as.frame(sparse[,3]));
+write(output, fileOUT, sep=",", sparse=FALSE, format="csv");
diff --git a/scripts/staging/entity-resolution/entity-clustering.dml b/scripts/staging/entity-resolution/entity-clustering.dml
new file mode 100644
index 0000000..eb11df6
--- /dev/null
+++ b/scripts/staging/entity-resolution/entity-clustering.dml
@@ -0,0 +1,117 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT PERFORMS AN ENTITY RESOLUTION PIPELINE FOR CLUSTERING ON A SINGLE FILE
+# CONSISTS OF BLOCKING, MATCHING, AND CLUSTERING
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME           TYPE   DEFAULT  MEANING
+# ---------------------------------------------------------------------------------------------
+# FX              String  ---     Location to read the frame of tokens in bow format
+#                                 Each line contains comma separated list of id, token and value
+# OUT             String  ---     Location to save the output of maching pairs
+#                                 Each line contains comma separated ids of one matched pair
+#                                 Third column provides the similarity score
+# threshold       Double  0.9     Threshold to be considered as a match
+# blocking_method String  naive   Possible values: ["naive", "lsh"].
+# num_blocks      Int     1       Number of blocks for naive blocking
+# num_hashtables  Int     6       Number of hashtables for LSH blocking.
+# num_hyperplanes Int     4       Number of hyperplanes for LSH blocking.
+
+# use_tokens      Boolean TRUE    Whether to use the tokens of FX to generate predictions
+# use_embeddings  Boolean FALSE   Whether to use the embeddings of XE to generate predictions
+# XE              String  ---     Location to read the frame of embedding matrix
+#                                 Required if use_embeddings is set to TRUE
+# store_mapping   Boolean FALSE   Whether to store the mapping of transformencode
+# MX              String  ---     Location to write the frame of mapping
+#                                Required if store_mapping is set to TRUE
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: frame of maching pairs
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/preprocessing.dml") as pre;
+source("./scripts/staging/entity-resolution/primitives/postprocessing.dml") as post;
+source("./scripts/staging/entity-resolution/primitives/pipeline.dml") as pipe;
+
+# Command Line Arguments
+fileFX = $FX;
+fileOUT = $OUT;
+
+threshold = ifdef($threshold, 0.9);
+blocking_method = ifdef($blocking_method, "lsh");
+num_blocks = ifdef($num_blocks, 1);
+num_hyperplanes = ifdef($num_hyperplanes, 4);
+num_hashtables = ifdef($num_hashtables, 6);
+use_tokens = ifdef($use_tokens, TRUE);
+use_embeddings = ifdef($use_embeddings, FALSE);
+# file XE is only required if using embeddings
+fileXE = ifdef($XE, "");
+# mapping file is required for evaluation
+store_mapping = ifdef($store_mapping, FALSE);
+fileMX = ifdef($MX, "");
+
+if (!(blocking_method == "naive" | blocking_method == "lsh")) {
+  print("ERROR: blocking method must be in ['naive', 'lsh']");
+}
+
+# Read data
+FX = read(fileFX);
+if (use_embeddings) {
+  if (fileXE == "")
+    print("You need to specify file XE when use_embeddings is set to TRUE");
+  else
+    X_embeddings = read(fileXE);
+}
+
+# Convert data
+[X, MX] = pre::convert_frame_tokens_to_matrix_bow(FX);
+if (use_tokens & use_embeddings) {
+  X = cbind(X, X_embeddings);
+} else if (use_tokens) {
+  # Nothing to do in this case, since X already contains tokens
+} else if (use_embeddings) {
+  X = X_embeddings;
+} else {
+  print("Either use_tokens or use_embeddings needs to be TRUE, using tokens only as default.");
+}
+
+if (store_mapping) {
+  if (fileMX == "")
+    print("You need to specify file MX when store_mapping is set to TRUE.");
+  else 
+    write(MX, fileMX);
+}
+
+# Perform clustering
+if (blocking_method == "naive") {
+  CLUSTER = pipe::entity_clustering_pipeline(X, num_blocks, threshold);
+} else if (blocking_method == "lsh") {
+  CLUSTER = pipe::entity_clustering_pipeline_lsh(X, num_hashtables, num_hyperplanes, threshold);
+}
+MATCH = (CLUSTER > 0);
+
+# Write results
+sparse = post::untable(CLUSTER);
+dec = transformdecode(target=sparse, meta=cbind(MX[,1],MX[,1]), spec="{recode:[C1,C2]}");
+output = cbind(dec, as.frame(sparse[,3]));
+write(output, fileOUT, sep=",", sparse=FALSE, format="csv");
diff --git a/scripts/staging/entity-resolution/eval-entity-resolution.dml b/scripts/staging/entity-resolution/eval-entity-resolution.dml
new file mode 100644
index 0000000..cc48497
--- /dev/null
+++ b/scripts/staging/entity-resolution/eval-entity-resolution.dml
@@ -0,0 +1,59 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT EVALUATES THE PREDICTIONS OF THE ENTITY RESOLUTION AGAINST THE GROUND TRUTH
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME  TYPE   DEFAULT  MEANING
+# ---------------------------------------------------------------------------------------------
+# FX    String  ---     Location to read the frame of the predictions
+#                       Each line contains comma separated ids of one matched pair
+#                       Remainig columns (>2) are ignored
+# FY    String  ---     Location to read the frame of the ground truth
+#                       Each line contains comma separated ids of one matched pair
+#                       Remainig columns (>2) are ignored
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: prints different evaluation metrics (accuracy, F1)
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/evaluation.dml") as eval;
+
+# Command Line Arguments
+fileFX = $FX;
+fileFY = $FY;
+
+use_MX = ifdef($use_MX, FALSE);
+
+# Read data
+FX = read(fileFX);
+FY = read(fileFY);
+
+# Transform the data
+[XY, MX] = transformencode(target=rbind(FX[,1:2],FY[,1:2]), spec="{recode:[C1,C2]}");
+X = XY[1:nrow(FX),];
+Y = XY[nrow(FX)+1:nrow(FX)+nrow(FY),];
+PRED = table(X[,1], X[,2], nrow(MX), nrow(MX));
+GT = table(Y[,1], Y[,2], nrow(MX), nrow(MX));
+
+# Perform the evaluation
+eval::print_eval_stats(PRED, GT);
diff --git a/scripts/staging/entity-resolution/primitives/blocking.dml b/scripts/staging/entity-resolution/primitives/blocking.dml
new file mode 100644
index 0000000..baa9aee
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/blocking.dml
@@ -0,0 +1,292 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Splits the rows of X into num_blocks non-overlapping regions.
+# May produce less than num_blocks regions.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       A dataset with rows to split into blocks.
+# num_blocks    Integer ---       How many blocks to produce.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# BLOCKS         Double   A column vector with start indices for each block. Has one more row
+#                         than blocks such that the last row is the end index of the last block.
+# --------------------------------------------------------------------------------------------
+naive_blocking = function(Matrix[Double] X, Integer num_blocks) return (Matrix[Double] BLOCKS) {
+  block_size_flt = nrow(X) / num_blocks;
+  if (block_size_flt < 1.0) {
+    BLOCKS= seq(1, nrow(X) + 1);
+  } else {
+    block_size = ceil(block_size_flt);
+    BLOCKS = rbind(as.matrix(1), 1 +  block_size * seq(1, num_blocks));
+    BLOCKS[num_blocks+1,] = nrow(X) + 1
+  }
+}
+
+# Sorts the rows in dataset X by the vector v. The order is defined by the ascending order of v.
+# Can return the vector v as first column of X, depending on parameter prepend_v.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       Any matrix with rows to be sorted.
+# v             matrix  ---       A vector with the same number of rows as X. Defines ordering.
+# prepend_v     boolean ---       Whether to return v as first column of X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_index       matrix  A vector with the original row number for each row in X.
+# X_sorted      matrix  The sorted input matrix X.
+# --------------------------------------------------------------------------------------------
+sort_by_vector = function(Matrix[Double] X, Matrix[Double] v, Boolean prepend_v) return (Matrix[Double] X_index, Matrix[Double] X_sorted) {
+  X_index = order(target=v, by=1, decreasing = FALSE, index.return=TRUE);
+  X_sorted = order(target=cbind(v, X), by=1, decreasing = FALSE, index.return=FALSE);
+  if (!prepend_v) {
+    X_sorted = X_sorted[,2:ncol(X_sorted)];
+  }
+}
+
+# Sorts the rows in dataset X by their sum.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       Any matrix with rows to be sorted.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_index       matrix  A vector with the original row number for each row in X.
+# X_sorted      matrix  The sorted input matrix X.
+# --------------------------------------------------------------------------------------------
+row_sum_sorting = function(Matrix[Double] X) return (Matrix[Double] X_index, Matrix[Double] X_sorted) {
+  X_rowSum = rowSums(X);
+  [X_index, X_sorted] = sort_by_vector(X, X_rowSum, FALSE);
+}
+
+# Sorts the rows in dataset X by their original row numbers (index).
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       Any matrix with rows to be sorted.
+# X_index       matrix  ---       The orginal row numbers to be restored.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_reindex     matrix  The reindexed matrix X.
+# --------------------------------------------------------------------------------------------
+reindex_rowwise = function(Matrix[Double] X, Matrix[Double] X_index) return (Matrix[Double] X_reindex) {
+  X_conc = cbind(X_index, X);
+  X_reindex = order(target=X_conc, by=1, decreasing = FALSE, index.return=FALSE);
+  # Remove index column
+  X_reindex = X_reindex[,2:ncol(X_reindex)];
+}
+
+# Sorts both rows and columns in dataset X by their original row numbers (index).
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       Any matrix that should be resorted.
+# X_index       matrix  ---       The orginal row numbers to be restored.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_reindex     matrix  The reindexed matrix X.
+# --------------------------------------------------------------------------------------------
+reindex_rows_and_cols = function(Matrix[Double] X, Matrix[Double] X_index) return (Matrix[Double] X_reindex) {
+  # First reindex rows
+  X_reindex = X;
+  X_reindex = reindex_rowwise(X_reindex, X_index);
+  # Then transpose and repeat
+  X_reindex = t(X_reindex);
+  X_reindex = reindex_rowwise(X_reindex, X_index);
+}
+
+# Generates a random matrix of hyperplane parameters.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# num_hyperplanes  Integer ---       How many hyperplanes to generate.
+# dimension        Integer ---       The number of parameters per hyperplane.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# H              matrix   A num_hyperplanes x dimension matrix of hyperplane parameters.
+# --------------------------------------------------------------------------------------------
+gen_rand_hyperplanes = function(Integer num_hyperplanes, Integer dimension) return (Matrix[Double] H) {
+  H = rand(rows=num_hyperplanes, cols=dimension, min=-1, max=1);
+}
+
+# Creates a scalar hash code for each row in X_plane by interpreting positive values as a
+# binary number.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X_plane         matrix  ---       The result of a plane equation for each row in X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_hash        matrix   A row vector of scalar hash codes. Same number of rows as X_plane.
+# --------------------------------------------------------------------------------------------
+aggregate_lsh_code = function(Matrix[Double] X_code) return (Matrix[Double] X_hash) {
+  X_pos = (X_code > 0);
+  pos_weights = 2^seq(ncol(X_code)-1, 0);
+  X_weights = X_pos * t(pos_weights);
+  X_hash = rowSums(X_weights);
+}
+
+# Computes locality-sensitive hash codes for each row in X.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A matrix that should be hashed.
+# H               matrix  ---       Hyperplanes matrix w. shape (num_hyperplanes, ncol(X))
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_hash        matrix   A column vector with hash codes. Same number of rows as X.
+# --------------------------------------------------------------------------------------------
+lsh = function(Matrix[Double] X, Matrix[Double] H) return (Matrix[Double] X_hash) {
+  X_plane = X %*% t(H);
+  X_hash = aggregate_lsh_code(X_plane);
+}
+
+lsh_find_blocks = function(Matrix[Double] X, Integer dimensions) return (Matrix[Double] BLOCKS){
+  BLOCKS = matrix(0, rows=2^dimensions+1, cols=1)
+  current_hash_value = 0.0;
+  BLOCKS[1,1] = 1
+  for (row_idx in 1:nrow(X)) {
+    this_hash = as.scalar(X[row_idx,1]);
+    if (current_hash_value < this_hash) {
+      for (h in (current_hash_value+2):(this_hash+1)) {
+        BLOCKS[h,1] = row_idx;
+      }
+      current_hash_value = this_hash;
+    }
+  }
+  n_missing_rows = nrow(BLOCKS) - (current_hash_value + 1);
+  BLOCKS[(current_hash_value+2):nrow(BLOCKS),] = matrix(nrow(X) + 1, rows=n_missing_rows, cols=1);
+}
+
+# Blocks the given matrix X by locality sensitive hashing using a set of random hyperplanes.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A matrix that should be blocked by LSH.
+# num_hyperplanes Integer ---       How many hyperplanes to use.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_index       matrix  A vector with the original row number for each row in X.
+# X_hash        matrix  A column vector with the scalar LSH hash code for each row in X.
+# X_sorted      matrix  The sorted input matrix X.
+# BLOCKS        Double  A column vector with start indices for each block. Has one more row
+#                        than blocks such that the last row is the end index of the last block.
+# --------------------------------------------------------------------------------------------
+lsh_blocking = function(Matrix[Double] X, Integer num_hyperplanes) return (Matrix[Double] X_index, Matrix[Double] X_hash, Matrix[Double] X_sorted, Matrix[Double] BLOCKS) {
+  H = gen_rand_hyperplanes(num_hyperplanes, ncol(X));
+  X_hash = lsh(X, H);
+  [X_index, X_sorted_with_hash] = sort_by_vector(X, X_hash, TRUE);
+  BLOCKS = lsh_find_blocks(X_sorted_with_hash, num_hyperplanes);
+  B_lengths = rbind(BLOCKS[2:nrow(BLOCKS),] - BLOCKS[1:nrow(BLOCKS)-1,], as.matrix(1));
+  BLOCKS = removeEmpty(target=BLOCKS, margin="rows", select=B_lengths)
+  X_sorted = X_sorted_with_hash[,2:ncol(X_sorted_with_hash)];
+  X_hash = X_sorted_with_hash[,1];
+}
+
+# Blocks the matrices X and Y by locality sensitive hashing using a set of random hyperplanes.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A matrix that should be blocked by LSH.
+# Y               matrix  ---       A matrix that should be blocked by LSH.
+# num_hyperplanes Integer ---       How many hyperplanes to use.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_index       matrix  A vector with the original row number for each row in X.
+# X_hash        matrix  A column vector with the scalar LSH hash code for each row in X.
+# X_sorted      matrix  The sorted input matrix X.
+# Y_index       matrix  A vector with the original row number for each row in Y.
+# Y_hash        matrix  A column vector with the scalar LSH hash code for each row in Y.
+# Y_sorted      matrix  The sorted input matrix Y.
+# BLOCKS        Double  A matrix with start indices for each block. Has one more row
+#                        than blocks such that the last row is the end index of the last block.
+#                        First column is X, second column Y indices.
+# --------------------------------------------------------------------------------------------
+lsh_blocking2 = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hyperplanes) return (
+        Matrix[Double] X_index, Matrix[Double] X_hash, Matrix[Double] X_sorted,
+        Matrix[Double] Y_index, Matrix[Double] Y_hash, Matrix[Double] Y_sorted,
+        Matrix[Double] BLOCKS) {
+  H = gen_rand_hyperplanes(num_hyperplanes, max(ncol(X), ncol(Y)));
+  X_hash = lsh(X, H[,1:ncol(X)]);
+  Y_hash = lsh(Y, H[,1:ncol(Y)]);
+  [X_index, Xs] = sort_by_vector(X, X_hash, TRUE);
+  [Y_index, Ys] = sort_by_vector(Y, Y_hash, TRUE);
+  X_blocks = lsh_find_blocks(Xs, num_hyperplanes);
+  Y_blocks = lsh_find_blocks(Ys, num_hyperplanes);
+  BLOCKS = cbind(X_blocks, Y_blocks)
+  B_lengths = rbind(BLOCKS[2:nrow(BLOCKS),] - BLOCKS[1:nrow(BLOCKS)-1,], matrix(1, rows=1, cols=2));
+  BLOCKS = removeEmpty(target=BLOCKS, margin="rows", select=B_lengths)
+  X_sorted = Xs[,2:ncol(Xs)];
+  X_hash = Xs[,1];
+  Y_sorted = Ys[,2:ncol(Ys)];
+  Y_hash = Ys[,1];
+}
diff --git a/scripts/staging/entity-resolution/primitives/clustering.dml b/scripts/staging/entity-resolution/primitives/clustering.dml
new file mode 100644
index 0000000..6c36476
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/clustering.dml
@@ -0,0 +1,158 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Recursive depth-first search which in some cases can run into stack overflow errors in the
+# SystemDS runtime. Not recommended, use dfs_iter instead!
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH             matrix  ---       An adjacency matrix.
+# start_vertex      Integer ---       The vertex to start the search from.
+# marked            matrix  ---       Column-vector which is used to mark already visited
+#                                      vertices. A value of 1 means already visited, 0 means
+#                                      unvisited.
+# id                matrix  ---       Column-vector which the connected component of each vertex
+#                                      is written to.
+# current_component Integer ---       The component id which is written to each visited vertex'
+#                                      entry in the id vector.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# id            matrix  ---       Column-vector which the connected component of each vertex is written to.
+# marked        matrix  ---       Column-vector which is used to mark already visited vertices.
+#                                  A value of 1 means already visited, 0 means unvisited.
+# --------------------------------------------------------------------------------------------
+dfs = function(Matrix[Double] GRAPH, Integer vertex, Matrix[Double] marked, Matrix[Double] id, Integer count) return (Matrix[Double] id, Matrix[Double] marked) {
+  marked[vertex,] = 1;
+  id[vertex,] = count;
+  for (col in 1:ncol(GRAPH)) {
+    if (as.scalar(GRAPH[vertex,col])) {
+      if (!as.scalar(marked[col,])) {
+        [id, marked] = dfs(GRAPH, col, marked, id, count);
+      }
+    }
+  }
+}
+
+# Iterative depth-first search with a manually allocated stack in order to avoid stack overflow
+# errors.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH             matrix  ---       An adjacency matrix.
+# start_vertex      Integer ---       The vertex to start the search from.
+# marked            matrix  ---       Column-vector which is used to mark already visited
+#                                      vertices. A value of 1 means already visited, 0 means
+#                                      unvisited.
+# id                matrix  ---       Column-vector which the connected component of each vertex
+#                                      is written to.
+# current_component Integer ---       The component id which is written to each visited vertex'
+#                                      entry in the id vector.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# id            matrix  ---       Column-vector which the connected component of each vertex is
+#                                  written to.
+# marked        matrix  ---       Column-vector which is used to mark already visited vertices.
+#                                  A value of 1 means already visited, 0 means unvisited.
+# --------------------------------------------------------------------------------------------
+dfs_iter = function(Matrix[Double] GRAPH, Integer start_vertex, Matrix[Double] marked, Matrix[Double] id, Integer current_component) return (Matrix[Double] id, Matrix[Double] marked) {
+  nvert =  ncol(GRAPH);
+  stack_idx = matrix(0, rows=nvert, cols=1);
+  stack_vid = matrix(0, rows=nvert, cols=1);
+
+  stack_ptr = 1;
+  stack_vid[stack_ptr,] = start_vertex;
+  stack_idx[stack_ptr,] = 1;
+
+  while(stack_ptr > 0) {
+    vertex = as.scalar(stack_vid[stack_ptr,]);
+    neigh_idx = as.scalar(stack_idx[stack_ptr,]);
+    if (neigh_idx == 1) {
+      id[vertex,] = current_component;
+      marked[vertex,] = 1;
+    }
+    # find next not visited neighbour
+    cont = TRUE;
+    while (cont) {
+        if (as.scalar(GRAPH[vertex, neigh_idx]) & !as.scalar(marked[neigh_idx,])) {
+          cont = FALSE;
+        }
+        else {
+          neigh_idx = neigh_idx + 1;
+          if (neigh_idx > nvert) {
+            cont = FALSE;
+          }
+        }
+    }
+    if (neigh_idx > nvert ) {
+      stack_ptr = stack_ptr - 1;
+    }
+    else if (neigh_idx == nvert & as.scalar(marked[neigh_idx,])) {
+      stack_ptr = stack_ptr - 1;
+    }
+    else {
+      stack_ptr = stack_ptr + 1;
+      stack_vid[stack_ptr,] = neigh_idx;
+      stack_idx[stack_ptr,] = 1;
+    }
+  }
+}
+
+# Makes all connected components in the adjacency matrix GRAPH fully-connected.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH             matrix  ---       An adjacency matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH             matrix  ---       An adjacency matrix with all connected components fully
+#                                      connected.
+# --------------------------------------------------------------------------------------------
+cluster_by_connected_components = function(Matrix[Double] ADJACENCY) return (Matrix[Double] COMPONENTS) {
+  #TODO could we reuse our components builtin?
+  rows = nrow(ADJACENCY);
+  marked = matrix(0, rows=rows, cols=1);
+  id = matrix(0, rows=nrow(ADJACENCY), cols=1);
+  count = 1;
+  for (s in 1:rows) {
+    if (!as.scalar(marked[s,])) {
+      #[id, marked] = dfs(ADJACENCY, s, marked, id, count);
+      [id, marked] = dfs_iter(ADJACENCY, s, marked, id, count);
+      count = count + 1;
+    }
+  }
+  COMPONENTS = outer(id, t(id), "==");
+  COMPONENTS = COMPONENTS - diag(diag(COMPONENTS));
+}
diff --git a/scripts/staging/entity-resolution/primitives/evaluation.dml b/scripts/staging/entity-resolution/primitives/evaluation.dml
new file mode 100644
index 0000000..9601582
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/evaluation.dml
@@ -0,0 +1,142 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Calculates the fraction of correctly predicted values in PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# PRED          matrix  ---       Predicted values, same shape as GT.
+# GT            matrix  ---       Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# score         Double   Fraction of correct values. 0 means all incorrect, 1 means all correct.
+# --------------------------------------------------------------------------------------------
+accuracy = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+  HITS = PRED == GT;
+  sum = sum(HITS);
+  total = length(HITS);
+  score = sum / total;
+}
+
+# Calculates the precision of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the fraction of correctly predicted values of the number of predicted values.
+# precision = |true_positives| / |predicted_positives|
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# PRED          matrix  ---       Predicted values, same shape as GT.
+# GT            matrix  ---       Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# score         Double   The precision score, as described above.
+# --------------------------------------------------------------------------------------------
+precision = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+  tp = sum(PRED * GT);
+  score = tp / sum(PRED);
+}
+
+# Calculates the recall of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the fraction of correctly predicted values of the number of values that should be predicted.
+# recall = |true_positives| / |ground_truth_positives|
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# PRED          matrix  ---       Predicted values, same shape as GT.
+# GT            matrix  ---       Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# score         Double   The recall score, as described above.
+# --------------------------------------------------------------------------------------------
+recall = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+  tp = sum(PRED * GT);
+  score = tp / sum(GT);
+}
+
+geometric_mean2 = function(Double a, Double b) return (Double geometric_mean) {
+  geometric_mean = 2 * (a * b) / (a  + b);
+}
+
+# Calculates the F1 score of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the geometric mean of the precision and recall scores.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# PRED          matrix  ---       Predicted values, same shape as GT.
+# GT            matrix  ---       Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# f1         Double   The F1 score, as described above.
+# precision  Double   The precision score, as described above.
+# recall     Double   The recall score, as described above.
+# --------------------------------------------------------------------------------------------
+f1 = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double f1, Double precision, Double recall) {
+  precision = precision(PRED, GT);
+  recall = recall(PRED, GT);
+  f1 = geometric_mean2(precision, recall);
+}
+
+# Calculates evaluation scores for PRED given a ground truth GT and prints them.
+# In both inputs, the value 0 means no prediction.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# PRED          matrix  ---       Predicted values, same shape as GT.
+# GT            matrix  ---       Ground truth values, same shape PRED.
+print_eval_stats = function(Matrix[Double] PRED, Matrix[Double] GT) {
+  acc = accuracy(PRED, GT);
+  [f1, precision, recall] = f1(PRED, GT);
+  print("Evaluation statistics:");
+  print("  PRED_nnz : %d", as.integer(sum(PRED != 0.0)));
+  print("  GT_nnz   : %d", as.integer(sum(GT != 0.0)));
+  print("  Accuracy : %6.4f", acc);
+  print("  Precision: %6.4f", precision);
+  print("  Recall   : %6.4f", recall);
+  print("  F1 score : %6.4f", f1);
+}
diff --git a/scripts/staging/entity-resolution/primitives/matching.dml b/scripts/staging/entity-resolution/primitives/matching.dml
new file mode 100644
index 0000000..5330bb8
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/matching.dml
@@ -0,0 +1,108 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Calculates the cosine similarity between rows of the given matrix X.
+# WARNING: Division by zero will occur for zero rows.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X              matrix  ---       Numeric matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# score         matrix   Symmetric matrix of cosine similarity between rows of X.
+# --------------------------------------------------------------------------------------------
+cosine = function(Matrix[Double] X) return (Matrix[Double] COS) {
+  Row_norm = sqrt(rowSums(X ^ 2));
+  NORM = Row_norm %*% t(Row_norm);
+  DOT = X %*% t(X);
+  COS = DOT / NORM;
+}
+
+# Calculates the cosine similarity between rows of X and rows of Y.
+# Uses an epsilon value for the euclidean norm of zero rows in order to avoid division by zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X              matrix  ---       Numeric matrix. ncols must match Y.
+# Y              matrix  ---       Numeric matrix. ncols must match X.
+# epsilon        double  ---       The value to replace the norm of zero rows with.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# COS           matrix   nrows(X) by nrows(Y) matrix of cosine similarity between rows of X and Y.
+# --------------------------------------------------------------------------------------------
+cosine2_eps = function(Matrix[Double] X, Matrix[Double] Y, Double epsilon) return (Matrix[Double] COS) {
+  X_row_norm = sqrt(rowSums(X ^ 2));
+  X_row_norm = X_row_norm + (X_row_norm < epsilon) * epsilon
+  Y_row_norm = sqrt(rowSums(Y ^ 2));
+  Y_row_norm = Y_row_norm + (Y_row_norm < epsilon) * epsilon
+  NORM = X_row_norm %*% t(Y_row_norm);
+  DOT = X %*% t(Y);
+  COS = DOT / NORM;
+}
+
+# Calculates the cosine similarity between rows of X and rows of Y.
+# Uses an epsilon value for the euclidean norm of zero rows in order to avoid division by zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X              matrix  ---       Numeric matrix. ncols must match Y.
+# Y              matrix  ---       Numeric matrix. ncols must match X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# COS           matrix   nrows(X) by nrows(Y) matrix of cosine similarity between rows of X and Y.
+# --------------------------------------------------------------------------------------------
+cosine2 = function(Matrix[Double] X, Matrix[Double] Y) return (Matrix[Double] COS) {
+  COS = cosine2_eps(X, Y, 1e-6)
+}
+
+# Sets values of the given matrix that are below the given threshold to zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X              matrix  ---       Numeric matrix.
+# threshold      double  ---       Values in X below this value are set to 0.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# THRES          matrix   X with values below threshold set to 0.
+# --------------------------------------------------------------------------------------------
+tresholding = function(Matrix[Double] X, Double threshold) return (Matrix[Double] THRES) {
+  THRES = (X > threshold) * X;
+}
diff --git a/scripts/staging/entity-resolution/primitives/pipeline.dml b/scripts/staging/entity-resolution/primitives/pipeline.dml
new file mode 100644
index 0000000..e6af155
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/pipeline.dml
@@ -0,0 +1,220 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/blocking.dml") as block;
+source("./scripts/staging/entity-resolution/primitives/matching.dml") as match;
+source("./scripts/staging/entity-resolution/primitives/clustering.dml") as cluster;
+
+
+# Very simple entity clustering pipeline which should work relatively well for small datasets.
+#
+# Blocks the input dataset X into num_blocks non-overlapping regions, after sorting the dataset
+# by the sum of its rows. This is a very simple blocking scheme which serves mainly as a baseline
+# example and will result in non-optimal performance. However, if no blocking is needed, this
+# can be used with num_blocks=1.
+#
+# Uses a threshold for similarity to link entities and clusters them by also connecting all
+# entities in each connected compontent ('makes each connected component fully connected').
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       A dataset for which duplicates should be found.
+# num_blocks    Integer ---       How many blocks to produce.
+# threshold     Double  ---       Similarity threshold which is used to decide if two entities
+#                                  are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_cluster     matrix   A symmetrical adjacency matrix for X defining the found duplicates.
+# --------------------------------------------------------------------------------------------
+entity_clustering_pipeline = function(Matrix[Double] X, Integer num_blocks, Double threshold) return (Matrix[Double] X_cluster) {
+  # First sort the matrix
+  [X_index, X_sorted] = block::row_sum_sorting(X);
+  X = X_sorted;
+  # Perform blocking: match and cluster each block
+  blocks = block::naive_blocking(X_sorted, num_blocks);
+  X_cluster = matrix(0, nrow(X_sorted), nrow(X_sorted));
+  # system ds raises false positives for dependency check, but chunks of matrix are indepedantly addresses
+  # supress error with check = 0
+  parfor (i in 1:nrow(blocks)-1, check = 0) {
+    block_start = as.scalar(blocks[i,]);
+    block_end = as.scalar(blocks[i+1,])-1;
+    X_block = X_sorted[block_start:block_end,];
+    X_sim = match::cosine(X_block);
+    X_thres = match::tresholding(X_sim, threshold);
+    X_match = (X_sim > threshold);
+    X_comp = cluster::cluster_by_connected_components(X_match);
+    X_cluster[block_start:block_end,block_start:block_end] = X_comp * X_sim;
+  }
+  # Reindex back the symmetrical matrix
+  X_cluster = block::reindex_rows_and_cols(X_cluster, X_index);
+}
+
+# Entity clustering pipeline using locality-sensitive hashing as a blocking algorithm to improve
+# runtime on large datasets. The tradeoff between accuracy and performance can be configured
+# via the num_hashtables and num_hyperplanes parameters.
+#
+# For more details on LSH, see:
+#  Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+#  Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+#
+# Uses a threshold for similarity to link entities and clusters them by also connecting all
+# entities in each connected compontent ('makes each connected component fully connected').
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A dataset for which duplicates should be found.
+# num_hashtables  Integer ---       How often to block the dataset using random hyperplanes and
+#                                     compute similarities.
+#                                     Increases runtime and improves accuracy.
+# num_hyperplanes Integer ---       The dimensionality of the random hyperplanes.
+#                                     Higher values produce smaller blocks and require a higher
+#                                     number of num_hashtables to avoid losing accuracy.
+# threshold       Double  ---       Similarity threshold which is used to decide if two entities
+#                                    are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X_cluster     matrix   A symmetrical adjacency matrix for X defining the found duplicates.
+# --------------------------------------------------------------------------------------------
+entity_clustering_pipeline_lsh = function(Matrix[Double] X, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] X_cluster) {
+  X_cluster = matrix(0, nrow(X), nrow(X));
+  # First get the LSH blocks
+  for (hashtable in 1:num_hashtables, check = 0) {
+    [X_index, X_hash, X_sorted, blocks] = block::lsh_blocking(X, num_hyperplanes);
+    X_cluster_local = matrix(0, nrow(X), nrow(X));
+    ## Perform blocking: match and cluster each block
+    parfor (i in 1:nrow(blocks)-1, check = 0) {
+      block_start = as.scalar(blocks[i,]);
+      block_end = as.scalar(blocks[i+1,])-1;
+      # Only apply to existing blocks
+      X_block = X_sorted[block_start:block_end,];
+      X_sim = match::cosine(X_block);
+      X_thres = match::tresholding(X_sim, threshold);
+      X_match = (X_sim > threshold);
+      X_comp = cluster::cluster_by_connected_components(X_match);
+      X_new_block = X_cluster_local[block_start:block_end,block_start:block_end] | (X_comp * X_sim);
+      # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
+      if (sum(X_new_block) > 0) {
+        X_cluster_local[block_start:block_end,block_start:block_end] = X_new_block;
+      }
+    }
+    ## Reindex back the symmetrical matrix
+    X_cluster = X_cluster | block::reindex_rows_and_cols(X_cluster_local, X_index);
+  }
+}
+
+# Very simple binary entity resolution pipeline which computes similarity between rows of two
+# datasets. This mainly serves as a baseline example and does not use blocking. It is not
+# suitable for large datasets.
+#
+# Uses a threshold for similarity to link entities.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A dataset to be compared with Y.
+# Y               matrix  ---       A dataset to be compared with X.
+# threshold       Double  ---       Similarity threshold which is used to decide if two
+#                                    entities are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# XY_pairs     matrix   An adjacency matrix defining the found duplicates between X and Y.
+#                        Shape is (nrow(X), nrow(Y)).
+# --------------------------------------------------------------------------------------------
+binary_entity_resolution_pipeline = function(Matrix[Double] X, Matrix[Double] Y, Double threshold) return (Matrix[Double] XY_pairs) {
+  XY_sim = match::cosine2(X, Y);
+  XY_pairs = match::tresholding(XY_sim, threshold);
+}
+
+# Binary entity resolution pipeline using locality-sensitive hashing as a blocking algorithm
+# to improve runtime on large datasets.
+#
+# The tradeoff between accuracy and performance can be configured
+# via the num_hashtables and num_hyperplanes parameters.
+#
+# For more details on LSH, see:
+#  Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+#  Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+#
+# Uses a threshold for similarity to link entities.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME            TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X               matrix  ---       A dataset to be compared with Y.
+# Y               matrix  ---       A dataset to be compared with X.
+# num_hashtables  Integer ---       How often to block the dataset using random hyperplanes and
+#                                     compute similarities.
+#                                     Increases runtime and improves accuracy.
+# num_hyperplanes Integer ---       The dimensionality of the random hyperplanes.
+#                                     Higher values produce smaller blocks and require a higher
+#                                     number of num_hashtables to avoid losing accuracy.
+# threshold       Double  ---       Similarity threshold which is used to decide if two
+#                                    entities are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# XY_pairs     matrix   An adjacency matrix defining the found duplicates between X and Y.
+#                        Shape is (nrow(X), nrow(Y)).
+# --------------------------------------------------------------------------------------------
+binary_entity_resolution_pipeline_lsh = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] XY_pairs) {
+  XY_pairs = matrix(0, nrow(X), nrow(Y));
+  for (hashtable in 1:num_hashtables, check = 0) {
+    [X_index, X_hash, X_sorted, Y_index, Y_hash, Y_sorted, blocks] = block::lsh_blocking2(X, Y, num_hyperplanes);
+    XY_pairs_local = matrix(0, nrow(X), nrow(Y));
+    parfor (i in 1:nrow(blocks)-1, check = 0) {
+      block_start = blocks[i,];
+      block_end = blocks[i+1,];
+      x_start = as.scalar(block_start[1,1]);
+      x_end = as.scalar(block_end[1,1]) - 1;
+
+      y_start = as.scalar(block_start[1,2]);
+      y_end = as.scalar(block_end[1,2]) - 1;
+      if ((x_end - x_start + 1) > 0 & (y_end - y_start + 1) > 0) {
+        X_block = X_sorted[x_start:x_end,];
+        Y_block = Y_sorted[y_start:y_end,];
+
+        XY_sim = match::cosine2(X_block, Y_block);
+        XY_pairs_block = match::tresholding(XY_sim, threshold);
+        # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
+        if (sum(XY_pairs_block) > 0) {
+          XY_pairs_local[x_start:x_end,y_start:y_end] = XY_pairs_block;
+        }
+      }
+    }
+    XY_pairs_local = block::reindex_rowwise(XY_pairs_local, X_index);
+    XY_pairs_local = t(XY_pairs_local);
+    XY_pairs_local = block::reindex_rowwise(XY_pairs_local, Y_index);
+    XY_pairs = XY_pairs | t(XY_pairs_local);
+  }
+}
diff --git a/scripts/staging/entity-resolution/primitives/postprocessing.dml b/scripts/staging/entity-resolution/primitives/postprocessing.dml
new file mode 100644
index 0000000..2110dfa
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/postprocessing.dml
@@ -0,0 +1,80 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Inverse operation of table. Converts a contingency table/adjacency matrix to a list of tuples
+# in the form (id1, id2, value) representing edges in the case of an adjacency matrix.
+#
+# The offsets can be used in order to restore the original ids in case the input to table was
+# concatenated from two datasets.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X                 matrix  ---       A contingency table/adjacency matrix.
+# row_offset        Integer ---       The number of rows of the first concatenated dataset.
+#                                      Use 0 for the default case.
+# col_offset        Integer ---       The number of columns of the first concatenated dataset.
+#                                      Use 0 for the default case.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# SPARSE        matrix  ---       List of tuples in the form (id1, id2, value).
+# --------------------------------------------------------------------------------------------
+untable_offset = function(Matrix[Double] X, Integer row_offset, Integer col_offset) return (Matrix[Double] SPARSE) {
+  n_row = nrow(X);
+  n_col = ncol(X);
+  n = n_row * n_col;
+
+  mat1 = seq(1+row_offset,n_row+row_offset) %*% matrix(1, rows=1, cols=n_col);
+  col1 = matrix(mat1, rows=n, cols=1, byrow=TRUE);
+  # col1 is a vector of indices repeated like 111 222 333 ...
+
+  mat2 = seq(1+col_offset,n_col+col_offset) %*% matrix(1, rows=1, cols=n_row);
+  col2 = matrix(mat2, rows=n, cols=1, byrow=FALSE);
+  # col2 is a vector of indices repeated like 123 123 123 ...
+
+  col3 = matrix(X, rows=n, cols=1, byrow=TRUE);
+  dense = cbind(col1, col2, col3);
+
+  SPARSE = removeEmpty(target=dense, margin="rows", select=dense[,3]);
+}
+
+# Inverse operation of table. Converts a contingency table/adjacency matrix to a list of tuples
+# in the form (id1, id2, value) representing edges in the case of an adjacency matrix.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# X                 matrix  ---       A contingency table/adjacency matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# SPARSE        matrix  ---       List of tuples in the form (id1, id2, value).
+# --------------------------------------------------------------------------------------------
+untable = function(Matrix[Double] X) return (Matrix[Double] SPARSE) {
+  SPARSE = untable_offset(X, 0, 0);
+}
\ No newline at end of file
diff --git a/scripts/staging/entity-resolution/primitives/preprocessing.dml b/scripts/staging/entity-resolution/primitives/preprocessing.dml
new file mode 100644
index 0000000..58c8431
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/preprocessing.dml
@@ -0,0 +1,85 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Converts a dataframe with form (id, token, weight) into a contingency table bag-of-words
+# representation.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# FX                frame  ---       A dataframe with form (id, token, weight).
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       A contingency table. Shape is (num_unique_ids, num_unique_tokens).
+# MX            frame   ---       The recoding meta-information for ids and tokens that is needed
+#                                  to convert indices in the X matrix back to their original
+#                                  id/token.
+# --------------------------------------------------------------------------------------------
+convert_frame_tokens_to_matrix_bow = function(Frame[Unknown] FX) return (Matrix[Double] X, Frame[String] MX) {
+  jspecx = "{recode:[C1,C2]}";
+  [X0, MX] = transformencode(target=FX, spec=jspecx);
+  X = table(X0[,1], X0[,2], X0[,3]);
+}
+
+# Converts two dataframes with form (id, token, weight) into contingency table bag-of-words
+# representations. Makes sure both contingency tables are using the same vocabulary.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME              TYPE    DEFAULT   MEANING
+# --------------------------------------------------------------------------------------------
+# FX                frame  ---       A dataframe with form (id, token, weight).
+# FY                frame  ---       A dataframe with form (id, token, weight).
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME          TYPE     MEANING
+# --------------------------------------------------------------------------------------------
+# X             matrix  ---       The contingency table for FX.
+#                                  Shape is (X_num_unique_ids, XY_num_unique_tokens).
+#                                  Uses same token order and encoding as Y.
+# Y             matrix  ---       The contingency table for FY.
+#                                  Shape is (Y_num_unique_ids, XY_num_unique_tokens).
+#                                  Uses same token order and encoding as X.
+# M_tokens      frame   ---       The recoding meta-information for tokens that is needed
+#                                  to convert column indices in the contingency tables back
+#                                  to their token strings.
+# MX_ids        frame   ---       The recoding meta-information for X ids that is needed
+#                                  to convert row indices in X back to ids for FX.
+# MY_ids        frame   ---       The recoding meta-information for Y ids that is needed
+#                                  to convert row indices in Y back to ids for FY.
+# --------------------------------------------------------------------------------------------
+convert_frame_tokens_to_matrix_bow_2 = function(Frame[Unknown] FX, Frame[Unknown] FY) return (Matrix[Double] X, Matrix[Double] Y, Frame[String] M_tokens, Frame[String] MX_ids, Frame[String] MY_ids) {
+  [E_tokens, M_tokens] = transformencode(target=rbind(FX[,2], FY[,2]), spec="{recode:[C1]}");
+  [Y_ids, MY_ids] = transformencode(target=FY[,1], spec="{recode:[C1]}");
+  [X_ids, MX_ids] = transformencode(target=FX[,1], spec="{recode:[C1]}");
+
+  X_tokens = E_tokens[1:nrow(FX),];
+  Y_tokens = E_tokens[nrow(FX):nrow(E_tokens),];
+
+  ncols = max(max(X_tokens), max(Y_tokens));
+  X = table(X_ids[,1], X_tokens[,1], as.matrix(FX[,3]), nrow(X_ids), ncols);
+  Y = table(Y_ids[,1], Y_tokens[,1], as.matrix(FY[,3]), nrow(Y_ids), ncols);
+}
diff --git a/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java b/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
index 8b3798d..c36d53f 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
@@ -40,6 +40,7 @@ import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.common.Types.OpOpN;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.instructions.cp.Data;
@@ -310,7 +311,7 @@ public class LiteralReplacement
 			
 			if(    data instanceof DataOp && vars.keySet().contains(data.getName())
 				&& isIntValueDataLiteral(rl, vars) && isIntValueDataLiteral(ru, vars) 
-				&& isIntValueDataLiteral(cl, vars) && isIntValueDataLiteral(cu, vars)  ) 
+				&& isIntValueDataLiteral(cl, vars) && isIntValueDataLiteral(cu, vars)  )
 			{
 				long rlval = getIntValueDataLiteral(rl, vars);
 				long ruval = getIntValueDataLiteral(ru, vars);
@@ -435,10 +436,10 @@ public class LiteralReplacement
 
 	private static boolean isIntValueDataLiteral(Hop h, LocalVariableMap vars)
 	{
-		return (  (h instanceof DataOp && vars.keySet().contains(h.getName())) 
-				|| h instanceof LiteralOp
-				||(h instanceof UnaryOp && (((UnaryOp)h).getOp()==OpOp1.NROW || ((UnaryOp)h).getOp()==OpOp1.NCOL)
-				   && h.getInput().get(0) instanceof DataOp && vars.keySet().contains(h.getInput().get(0).getName())) );
+		return ( (h instanceof DataOp && vars.keySet().contains(h.getName())) 
+			|| h instanceof LiteralOp
+			||(h instanceof UnaryOp && (((UnaryOp)h).getOp()==OpOp1.NROW || ((UnaryOp)h).getOp()==OpOp1.NCOL)
+				&& h.getInput().get(0) instanceof DataOp && vars.keySet().contains(h.getInput().get(0).getName())) );
 	}
 	
 	private static long getIntValueDataLiteral(Hop hop, LocalVariableMap vars)
@@ -447,26 +448,22 @@ public class LiteralReplacement
 		
 		try 
 		{
-			if( hop instanceof LiteralOp )
-			{
+			if( hop instanceof LiteralOp ) {
 				value = HopRewriteUtils.getIntValue((LiteralOp)hop);
 			}
-			else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NROW )
-			{
+			else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NROW ) {
 				//get the dimension information from the matrix object because the hop
 				//dimensions might not have been updated during recompile
-				MatrixObject mo = (MatrixObject)vars.get(hop.getInput().get(0).getName());
+				CacheableData<?> mo = (CacheableData<?>)vars.get(hop.getInput().get(0).getName());
 				value = mo.getNumRows();
 			}
-			else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NCOL )
-			{
+			else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NCOL ) {
 				//get the dimension information from the matrix object because the hop
 				//dimensions might not have been updated during recompile
-				MatrixObject mo = (MatrixObject)vars.get(hop.getInput().get(0).getName());
+				CacheableData<?> mo = (CacheableData<?>)vars.get(hop.getInput().get(0).getName());
 				value = mo.getNumColumns();
 			}
-			else
-			{
+			else {
 				ScalarObject sdat = (ScalarObject) vars.get(hop.getName());
 				value = sdat.getLongValue();
 			}
@@ -522,5 +519,4 @@ public class LiteralReplacement
 		
 		return val;
 	}
-
 }
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index c87490a..602393e 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -335,6 +335,14 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
 		return _metaData.getDataCharacteristics();
 	}
 
+	public long getNumRows() {
+		return getDataCharacteristics().getRows();
+	}
+
+	public long getNumColumns() {
+		return getDataCharacteristics().getCols();
+	}
+	
 	public abstract void refreshMetaData();
 
 	/**
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index f850e73..7509c02 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -177,14 +177,6 @@ public class MatrixObject extends CacheableData<MatrixBlock>
 		mc.setNonZeros( _data.getNonZeros() );
 	}
 
-	public long getNumRows() {
-		return getDataCharacteristics().getRows();
-	}
-
-	public long getNumColumns() {
-		return getDataCharacteristics().getCols();
-	}
-
 	public long getBlocksize() {
 		return getDataCharacteristics().getBlocksize();
 	}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java
new file mode 100644
index 0000000..85a6d09
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EntityResolutionBinaryTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "EntityResolutionBinary";
+	private final static String TEST_DIR = "applications/entity_resolution/binary/";
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_DIR, TEST_NAME);
+	}
+
+	@Test
+	public void testParams1() {
+		testScriptEndToEnd(1, 1);
+	}
+	@Test
+	public void testParams2() {
+		testScriptEndToEnd(1, 3);
+	}
+	@Test
+	public void testParams3() {
+		testScriptEndToEnd(3, 1);
+	}
+	@Test
+	public void testParams4() {
+		testScriptEndToEnd(5, 5);
+	}
+
+	public void testScriptEndToEnd(int numLshHashtables, int numLshHyperplanes) {
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		loadTestConfiguration(config);
+		fullDMLScriptName = "./scripts/staging/entity-resolution/binary-entity-resolution.dml";;
+
+		programArgs = new String[]{
+				"-nvargs", //
+				"FX=" + sourceDirectory + "input.csv", //
+				"FY=" + sourceDirectory + "input.csv", //
+				"OUT=" + output("B"), //
+				"num_hashtables=" + numLshHashtables,
+				"num_hyperplanes=" + numLshHyperplanes,
+		};
+
+		runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+		// LSH is not deterministic, so in this test we just assert that it runs and produces a file
+		Assert.assertTrue(Files.exists(Paths.get(output("B"))));
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java
new file mode 100644
index 0000000..36bf0e5
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Test;
+
+public class EntityResolutionBlockingTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "EntityResolutionBlocking";
+	private final static String TEST_DIR = "applications/entity_resolution/blocking/";
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_DIR, "blocking_naive", new String[]{"B"}));
+	}
+
+	@Test
+	public void testNaive1() {
+		testNaiveBlocking(
+			new double[][]{{0,},}, 
+			10,
+			new double[][]{{1,},{2,},}
+		);
+	}
+	@Test
+	public void testNaive2() {
+		testNaiveBlocking(
+			new double[][]{{0,},{1,},},
+			1,
+			new double[][]{{1,},{3,},}
+		);
+	}
+	@Test
+	public void testNaive3() {
+		testNaiveBlocking(
+			new double[][]{{0,},{1,},},
+			2,
+			new double[][]{{1,},{2,},{3,},});
+	}
+	@Test
+	public void testNaive4() {
+		testNaiveBlocking(
+			new double[][]{{0,},{1,},{2,},},
+			2,
+			new double[][]{{1,}, {3,},{4,},});
+	}
+
+	public void testNaiveBlocking(double[][] dataset, int targetNumBlocks, double[][] expectedBlockingIndices) {
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		loadTestConfiguration(config);
+		fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml";
+
+		programArgs = new String[]{
+				"-nvargs", //
+				"inFile=" + input("A"), //
+				"outFile=" + output("B"), //
+				"targetNumBlocks=" + targetNumBlocks
+		};
+		writeInputMatrixWithMTD("A", dataset, false);
+		writeExpectedMatrix("B", expectedBlockingIndices);
+		runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+		compareResults();
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java
new file mode 100644
index 0000000..daf751c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class EntityResolutionClusteringTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "EntityResolutionClustering";
+	private final static String TEST_DIR = "applications/entity_resolution/clustering/";
+
+	enum BlockingMethod {
+		NAIVE,
+		LSH,
+	}
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_DIR, TEST_NAME);
+	}
+
+	@Test
+	public void testNaive1() throws IOException {
+		testScriptEndToEnd(0.3, 1, BlockingMethod.NAIVE, 0, 0);
+	}
+	@Test
+	public void testLSH1() throws IOException {
+		testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 1, 1);
+	}
+	@Test
+	public void testLSH2() throws IOException {
+		testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 1, 3);
+	}
+	@Test
+	public void testLSH3() throws IOException {
+		testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 3, 1);
+	}
+	@Test
+	public void testLSH4() throws IOException {
+		testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 5, 5);
+	}
+
+	public void testScriptEndToEnd(double threshold, int numBlocks, BlockingMethod blockingMethod, int numLshHashtables, int numLshHyperplanes) throws IOException {
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		loadTestConfiguration(config);
+		fullDMLScriptName = "./scripts/staging/entity-resolution/entity-clustering.dml";
+
+		programArgs = new String[]{
+				"-nvargs", //
+				"FX=" + sourceDirectory + "input.csv", //
+				"OUT=" + output("B"), //
+				"threshold=" + threshold,
+				"num_blocks=" + numBlocks,
+				"blocking_method=" + (blockingMethod == BlockingMethod.LSH ? "lsh" : "naive"),
+				"num_hashtables=" + numLshHashtables,
+				"num_hyperplanes=" + numLshHyperplanes,
+		};
+
+		runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+		// LSH is not deterministic, so in this test we just assert that it runs and produces a file
+		if (blockingMethod == BlockingMethod.LSH) {
+			Assert.assertTrue(Files.exists(Paths.get(output("B"))));
+			return;
+		}
+
+		Files.copy(Paths.get(sourceDirectory + "expected.csv"), Paths.get(output("expected.csv")), StandardCopyOption.REPLACE_EXISTING);
+		Files.copy(Paths.get(sourceDirectory + "expected.csv.mtd"), Paths.get(output("expected.csv.mtd")), StandardCopyOption.REPLACE_EXISTING);
+
+		FrameBlock expectedPairs = readDMLFrameFromHDFS("expected.csv", Types.FileFormat.CSV);
+		FrameBlock predictedPairs = readDMLFrameFromHDFS("B", Types.FileFormat.CSV);
+
+
+		Iterator<Object[]> expectedIter = expectedPairs.getObjectRowIterator();
+		Iterator<Object[]> predictedIter = predictedPairs.getObjectRowIterator();
+
+		int row = 0;
+		while (expectedIter.hasNext()) {
+			Assert.assertTrue(predictedIter.hasNext());
+			Object[] expected = Arrays.copyOfRange(expectedIter.next(), 0, 2);
+			Object[] predicted = Arrays.copyOfRange(predictedIter.next(), 0, 2);
+			Assert.assertArrayEquals("Row " + row + " differs.", expected, predicted);
+			row++;
+		}
+		Assert.assertEquals(expectedPairs.getNumRows(), predictedPairs.getNumRows());
+		tearDown();
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java
new file mode 100644
index 0000000..44fee8b
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Test;
+
+public class EntityResolutionConnectedComponentsTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "EntityResolutionConnectedComponents";
+	private final static String TEST_DIR = "applications/entity_resolution/connected_components/";
+
+	@Override
+	public void setUp() {
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_DIR, "cluster_by_connected_components", new String[]{"B"}));
+	}
+
+	@Test
+	public void testConnectedComponents1() {
+		testClusterByConnectedComponent(
+			new double[][]{{0,},},
+			new double[][]{{0,}}
+		);
+	}
+	@Test
+	public void testConnectedComponents2() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 0},
+				{0, 0},
+			},
+			new double[][]{
+				{0, 0},
+				{0, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents3() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 1},
+				{1, 0},
+			},
+			new double[][]{
+				{0, 1},
+				{1, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents4() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 1, 0},
+				{1, 0, 1},
+				{0, 1, 0},
+			},
+			new double[][]{
+				{0, 1, 1},
+				{1, 0, 1},
+				{1, 1, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents5() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 0, 1, 0, 0, 0},
+				{0, 0, 0, 1, 0, 0},
+				{1, 0, 0, 0, 1, 0},
+				{0, 1, 0, 0, 0, 0},
+				{0, 0, 1, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0},
+			},
+			new double[][]{
+				{0, 0, 1, 0, 1, 0},
+				{0, 0, 0, 1, 0, 0},
+				{1, 0, 0, 0, 1, 0},
+				{0, 1, 0, 0, 0, 0},
+				{1, 0, 1, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents6() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+			},
+			new double[][]{
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 0, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents7() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 1, 0, 1, 0, 0, 0},
+				{1, 0, 1, 1, 0, 0, 0},
+				{0, 1, 0, 0, 0, 0, 0},
+				{1, 1, 0, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 1, 1},
+				{0, 0, 0, 0, 1, 0, 1},
+				{0, 0, 0, 0, 1, 1, 0},
+			},
+			new double[][]{
+				{0, 1, 1, 1, 0, 0, 0},
+				{1, 0, 1, 1, 0, 0, 0},
+				{1, 1, 0, 1, 0, 0, 0},
+				{1, 1, 1, 0, 0, 0, 0},
+				{0, 0, 0, 0, 0, 1, 1},
+				{0, 0, 0, 0, 1, 0, 1},
+				{0, 0, 0, 0, 1, 1, 0},
+			}
+		);
+	}
+	@Test
+	public void testConnectedComponents8() {
+		testClusterByConnectedComponent(
+			new double[][]{
+				{0, 1, 1, 1, 1, 1, 1},
+				{1, 0, 1, 1, 1, 1, 1},
+				{1, 1, 0, 1, 1, 1, 1},
+				{1, 1, 1, 0, 1, 1, 1},
+				{1, 1, 1, 1, 0, 1, 1},
+				{1, 1, 1, 1, 1, 0, 1},
+				{1, 1, 1, 1, 1, 1, 0},
+			},
+			new double[][]{
+				{0, 1, 1, 1, 1, 1, 1},
+				{1, 0, 1, 1, 1, 1, 1},
+				{1, 1, 0, 1, 1, 1, 1},
+				{1, 1, 1, 0, 1, 1, 1},
+				{1, 1, 1, 1, 0, 1, 1},
+				{1, 1, 1, 1, 1, 0, 1},
+				{1, 1, 1, 1, 1, 1, 0},
+			}
+		);
+	}
+
+	public void testClusterByConnectedComponent(double[][] adjacencyMatrix, double[][] expectedMatrix) {
+		TestConfiguration config = getTestConfiguration(TEST_NAME);
+		loadTestConfiguration(config);
+		fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml";
+		programArgs = new String[]{"-nvargs",
+			"inFile=" + input("A"), "outFile=" + output("B")};
+		writeInputMatrixWithMTD("A", adjacencyMatrix, false);
+		writeExpectedMatrix("B", expectedMatrix);
+		runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+		compareResults(0.01);
+	}
+}
diff --git a/src/test/scripts/applications/entity_resolution/binary/expected.csv b/src/test/scripts/applications/entity_resolution/binary/expected.csv
new file mode 100644
index 0000000..fe9af4f
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/expected.csv
@@ -0,0 +1,6 @@
+itemA,itemC,1.0
+itemA,itemD,1.0
+itemC,itemA,1.0
+itemC,itemD,1.0
+itemD,itemA,1.0
+itemD,itemC,1.0
diff --git a/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd b/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd
new file mode 100644
index 0000000..fc4fde8
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd
@@ -0,0 +1,7 @@
+{
+  "data_type": "frame",
+  "schema": "STRING,STRING,FP64,",
+  "rows": 6,
+  "cols": 3,
+  "format": "csv"
+}
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/binary/input.csv b/src/test/scripts/applications/entity_resolution/binary/input.csv
new file mode 100644
index 0000000..51d4bc9
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/input.csv
@@ -0,0 +1,7 @@
+itemA,tokenA,1
+itemA,tokenC,1
+itemB,tokenB,1
+itemC,tokenC,1
+itemC,tokenD,2
+itemD,tokenA,1
+itemD,tokenD,2
diff --git a/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd b/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd
new file mode 100644
index 0000000..1a72ed3
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd
@@ -0,0 +1,7 @@
+{
+  "data_type": "frame",
+  "rows": 7,
+  "cols": 3,
+  "format": "csv",
+  "header": false
+}
\ No newline at end of file
diff --git a/.github/workflows/applicationTests.yml b/src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
similarity index 60%
copy from .github/workflows/applicationTests.yml
copy to src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
index 652b31a..642561a 100644
--- a/.github/workflows/applicationTests.yml
+++ b/src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
@@ -19,31 +19,8 @@
 #
 #-------------------------------------------------------------
 
-name: Application Test
+source("scripts/staging/entity-resolution/primitives/blocking.dml") as blocking;
 
-on:
-  push:
-    branches:
-      - master
-  pull_request:
-    branches:
-      - master
-
-jobs:
-  applicationsTests:
-    runs-on: ${{ matrix.os }}
-    strategy:
-      fail-fast: false
-      matrix:
-        tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
-        os: [ubuntu-latest]
-    name:  Ap Test ${{ matrix.tests }} 
-    steps:
-    - name: Checkout Repository
-      uses: actions/checkout@v2
-
-    - name: Run all tests starting with "${{ matrix.tests }}"
-      uses: ./.github/action/
-      id: test
-      with:
-        test-to-run: org.apache.sysds.test.applications.${{ matrix.tests }}**
+A = read($inFile);
+B = blocking::naive_blocking(A, $targetNumBlocks);
+write(B, $outFile);
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/clustering/expected.csv b/src/test/scripts/applications/entity_resolution/clustering/expected.csv
new file mode 100644
index 0000000..fe9af4f
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/expected.csv
@@ -0,0 +1,6 @@
+itemA,itemC,1.0
+itemA,itemD,1.0
+itemC,itemA,1.0
+itemC,itemD,1.0
+itemD,itemA,1.0
+itemD,itemC,1.0
diff --git a/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd b/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd
new file mode 100644
index 0000000..fc4fde8
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd
@@ -0,0 +1,7 @@
+{
+  "data_type": "frame",
+  "schema": "STRING,STRING,FP64,",
+  "rows": 6,
+  "cols": 3,
+  "format": "csv"
+}
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/clustering/input.csv b/src/test/scripts/applications/entity_resolution/clustering/input.csv
new file mode 100644
index 0000000..51d4bc9
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/input.csv
@@ -0,0 +1,7 @@
+itemA,tokenA,1
+itemA,tokenC,1
+itemB,tokenB,1
+itemC,tokenC,1
+itemC,tokenD,2
+itemD,tokenA,1
+itemD,tokenD,2
diff --git a/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd b/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd
new file mode 100644
index 0000000..1a72ed3
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd
@@ -0,0 +1,7 @@
+{
+  "data_type": "frame",
+  "rows": 7,
+  "cols": 3,
+  "format": "csv",
+  "header": false
+}
\ No newline at end of file
diff --git a/.github/workflows/applicationTests.yml b/src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
similarity index 60%
copy from .github/workflows/applicationTests.yml
copy to src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
index 652b31a..be51c02 100644
--- a/.github/workflows/applicationTests.yml
+++ b/src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
@@ -19,31 +19,8 @@
 #
 #-------------------------------------------------------------
 
-name: Application Test
+source("scripts/staging/entity-resolution/primitives/clustering.dml") as cluster;
 
-on:
-  push:
-    branches:
-      - master
-  pull_request:
-    branches:
-      - master
-
-jobs:
-  applicationsTests:
-    runs-on: ${{ matrix.os }}
-    strategy:
-      fail-fast: false
-      matrix:
-        tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
-        os: [ubuntu-latest]
-    name:  Ap Test ${{ matrix.tests }} 
-    steps:
-    - name: Checkout Repository
-      uses: actions/checkout@v2
-
-    - name: Run all tests starting with "${{ matrix.tests }}"
-      uses: ./.github/action/
-      id: test
-      with:
-        test-to-run: org.apache.sysds.test.applications.${{ matrix.tests }}**
+A = read($inFile);
+B = cluster::cluster_by_connected_components(A);
+write(B, $outFile);
\ No newline at end of file