You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2020/07/28 19:03:46 UTC
[systemds] branch master updated: [SYSTEMDS-2593] Entity resolution
pipelines and primitives.
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new ee77fad [SYSTEMDS-2593] Entity resolution pipelines and primitives.
ee77fad is described below
commit ee77fadbb569d93f6738bb7b6083568a35cb3790
Author: Samuel Kogler <sa...@gmail.com>
AuthorDate: Tue Jul 28 20:45:46 2020 +0200
[SYSTEMDS-2593] Entity resolution pipelines and primitives.
Adds new scripts in `scripts/staging/entity-resolution` that demonstrate
entity clustering and binary entity resolution with SystemDS DML.
See the README at `scripts/staging/entity-resolution/README.md` for more
details.
This is a squash of all commits on branch master from the
skogler/systemml fork.
AMLS project SS2020.
Closes #993.
Co-authored-by: Markus Reiter-Haas <is...@gmail.com>
---
.github/workflows/applicationTests.yml | 2 +-
dev/Tasks-obsolete.txt | 1 +
docs/index.md | 1 +
docs/site/entity-resolution.md | 137 ++++++++++
.../entity-resolution/binary-entity-resolution.dml | 102 +++++++
.../entity-resolution/entity-clustering.dml | 117 +++++++++
.../entity-resolution/eval-entity-resolution.dml | 59 +++++
.../entity-resolution/primitives/blocking.dml | 292 +++++++++++++++++++++
.../entity-resolution/primitives/clustering.dml | 158 +++++++++++
.../entity-resolution/primitives/evaluation.dml | 142 ++++++++++
.../entity-resolution/primitives/matching.dml | 108 ++++++++
.../entity-resolution/primitives/pipeline.dml | 220 ++++++++++++++++
.../primitives/postprocessing.dml | 80 ++++++
.../entity-resolution/primitives/preprocessing.dml | 85 ++++++
.../sysds/hops/recompile/LiteralReplacement.java | 28 +-
.../controlprogram/caching/CacheableData.java | 8 +
.../controlprogram/caching/MatrixObject.java | 8 -
.../applications/EntityResolutionBinaryTest.java | 74 ++++++
.../applications/EntityResolutionBlockingTest.java | 81 ++++++
.../EntityResolutionClusteringTest.java | 116 ++++++++
.../EntityResolutionConnectedComponentsTest.java | 184 +++++++++++++
.../entity_resolution/binary/expected.csv | 6 +
.../entity_resolution/binary/expected.csv.mtd | 7 +
.../entity_resolution/binary/input.csv | 7 +
.../entity_resolution/binary/input.csv.mtd | 7 +
.../entity_resolution/blocking/blocking_naive.dml | 31 +--
.../entity_resolution/clustering/expected.csv | 6 +
.../entity_resolution/clustering/expected.csv.mtd | 7 +
.../entity_resolution/clustering/input.csv | 7 +
.../entity_resolution/clustering/input.csv.mtd | 7 +
.../cluster_by_connected_components.dml | 31 +--
31 files changed, 2040 insertions(+), 79 deletions(-)
diff --git a/.github/workflows/applicationTests.yml b/.github/workflows/applicationTests.yml
index 652b31a..5b36aab 100644
--- a/.github/workflows/applicationTests.yml
+++ b/.github/workflows/applicationTests.yml
@@ -35,7 +35,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
+ tests: [A,B,C,E,G,H,I,L,M,N,O,P,S,U,W]
os: [ubuntu-latest]
name: Ap Test ${{ matrix.tests }}
steps:
diff --git a/dev/Tasks-obsolete.txt b/dev/Tasks-obsolete.txt
index 561ef48..17dd708 100644
--- a/dev/Tasks-obsolete.txt
+++ b/dev/Tasks-obsolete.txt
@@ -236,6 +236,7 @@ SYSTEMDS-260 Misc Tools
* 262 Data augmentation tool for data cleaning OK
* 263 ONNX graph importer (Python API, docs, tests) OK
* 264 ONNX graph exporter
+ * 265 Entity resolution pipelines and primitives OK
SYSTEMDS-270 Compressed Matrix Blocks
* 271 Reintroduce compressed matrix blocks from SystemML OK
diff --git a/docs/index.md b/docs/index.md
index bb072ab..7b09939 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -36,6 +36,7 @@ Various forms of documentation for SystemDS are available.
- a [DML language reference](./site/dml-language-reference) for an list of operations possible inside SystemDS.
- [builtin functions](./site/builtins-reference) contains a collection of builtin functions providing an high level abstraction on complex machine learning algorithms.
+- [Entity Resolution](./site/entity-resolution) provides a collection of customizable entity resolution primitives and pipelines.
- [Run SystemDS](./site/run) contains an Helloworld example along with an environment setup guide.
- Instructions on python can be found at [Python Documentation](./api/python/index)
- The [javadoc API](./api/java/index) contains internal documentation of the system source code.
diff --git a/docs/site/entity-resolution.md b/docs/site/entity-resolution.md
new file mode 100644
index 0000000..593d712
--- /dev/null
+++ b/docs/site/entity-resolution.md
@@ -0,0 +1,137 @@
+---
+layout: site
+title: Entity Resolution
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+## Pipeline design and primitives
+
+We provide two example scripts, `entity-clustering.dml` and `binary-entity-resolution.dml`. These handle reading input
+files and writing output files and call functions provided in `primitives/pipeline.dml`.
+
+The pipeline design is loosely based on the following paper, but does not use advanced features like multi-probe LSH,
+combining embeddings via LSTM or classification via machine learning.
+
+```
+Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+```
+
+### Input files
+
+The provided scripts can read two types of input files. The token file is mandatory since it contains the row identifiers,
+but the embedding file is optional. The actual use of tokens and/or embeddings can be configured via command line parameters
+to the scripts.
+
+##### Token files
+
+This file type is a CSV file with 3 columns. The first column is the string or integer row identifier, the second is the
+string token, and the third is the number of occurences. This simple format is used as a bag-of-words representation.
+
+##### Embedding files
+
+This file type is a CSV matrix file with each row containing arbitrary-dimensional embeddings. The order of row identifiers
+is assumed to be the same as in the token file. This saves some computation and storage time, but could be changed with
+some modifications to the example scripts.
+
+### Primitives
+
+While the example scripts may be sufficient for many simple use cases, we aim to provide a toolkit of composable functions
+to facilitate more complex tasks. The top-level pipelines are defined as a couple of functions in `primitives/pipeline.dml`.
+The goal is that it should be relatively easy to copy one of these pipelines and swap out the primitive functions used
+to create a custom pipeline.
+
+To convert the input token file into a bag-of-words contingency table representation, we provide the functions
+`convert_frame_tokens_to_matrix_bow` and `convert_frame_tokens_to_matrix_bow_2` in `primitives/preprocessing.dml`.
+The latter is used to compute a compatible contigency table with matching vocabulary for binary entity resolution.
+
+We provide naive, constant-size blocking and locality-sensitive hashing (LSH) as functions in `primitives/blocking.dml`.
+
+For entity clustering, we only provide a simple clustering approach which makes all connected components in an adjacency
+matrix fully connected. This function is located in `primitives/clustering.dml`.
+
+To restore an adjacency matrix to a list of pairs, we provide the functions `untable` and `untable_offset` in
+`primitives/postprocessing.dml`.
+
+Finally, `primitives/evaluation.dml` defines some metrics that can be used to evaluate the performance of the entity
+resolution pipelines. They are used in the script `eval-entity-resolution.dml`.
+
+## Testing and Examples
+
+There is a test data repository that was used to develop these scripts at
+[repo](https://github.com/skogler/systemds-amls-project-data). In the examples below, it is assumed that this repo is
+cloned as `data` in the SystemDS root folder. The data in that repository is sourced from the Uni Leipzig entity resolution
+[benchmark](https://dbs.uni-leipzig.de/research/projects/object_matching/benchmark_datasets_for_entity_resolution).
+
+### Preprocessing
+
+Since there is no tokenization functionality in SystemDS yet, we provide a Python preprocessing script in the data repository
+that tokenizes the text columns and performs some simple embedding lookup using Glove embeddings.
+
+The tokens are written as CSV files to enable Bag-of-Words representations as well as matrices with combined embeddings. D
+epending on the type of data, one or the other or a combination of both may be better. The SystemDS DML scripts can be
+called with different parameters to experiment with this.
+
+### Entity Clustering
+
+In this case we detect duplicates within one database. As an example, we use the benchmark dataset Affiliations from Uni Leipzig.
+For this dataset, embeddings do not work well since the data is mostly just names. Therefore, we encode it as Bag-of-Words vectors
+in the example below. This dataset would benefit from more preprocessing, as simply matching words for all the different kinds of
+abbreviations does not work particularly well.
+
+Example command to run on Affiliations dataset:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/entity-clustering.dml -nvargs FX=data/affiliationstrings/affiliationstrings_tokens.csv OUT=data/affiliationstrings/affiliationstrings_res.csv store_mapping=FALSE MX=data/affiliationstrings/affiliationstrings_MX.csv use_embeddings=FALSE XE=data/affiliationstrings/affiliationstrings_embeddings.csv
+```
+Evaluation:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/eval-entity-resolution.dml -nvargs FX=data/affiliationstrings/affiliationstrings_res.csv FY=data/affiliationstrings/affiliationstrings_mapping_fixed.csv
+```
+
+### Binary Entity Resolution
+
+In this case we detect duplicate pairs of rows between two databases. As an example, we use the benchmark dataset DBLP-ACM from Uni Leipzig.
+Embeddings work really well for this dataset, so the results are quite good with an F1 score of 0.89.
+
+Example command to run on DBLP-ACM dataset with embeddings:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/binary-entity-resolution.dml -nvargs FY=data/DBLP-ACM/ACM_tokens.csv FX=data/DBLP-ACM/DBLP2_tokens.csv MX=data/DBLP-ACM_MX.csv OUT=data/DBLP-ACM/DBLP-ACM_res.csv XE=data/DBLP-ACM/DBLP2_embeddings.csv YE=data/DBLP-ACM/ACM_embeddings.csv use_embeddings=TRUE
+```
+Evaluation:
+```
+./bin/systemds ./scripts/algorithms/entity-resolution/eval-entity-resolution.dml -nvargs FX=data/DBLP-ACM/DBLP-ACM_res.csv FY=data/DBLP-ACM/DBLP-ACM_perfectMapping.csv
+```
+
+## Future Work
+
+1. Better clustering algorithms.
+ 1. Correlation clustering.
+ 2. Markov clustering.
+ 3. See [this link](https://dbs.uni-leipzig.de/en/publication/title/comparative_evaluation_of_distributed_clustering_schemes_for_multi_source_entity_resolution) for more approaches.
+2. Multi-Probe LSH to improve runtime performance.
+ 1. Probably as a SystemDS built-in to be more efficient.
+3. Classifier-based matching.
+ 1. Using an SVM classifier to decide if two tuple are duplicates instead of a threshold for similarity.
+4. Better/built-in tokenization.
+ 1. Implement text tokenization as component of SystemDS.
+ 2. Offer choice of different preprocessing and tokenization algorithms (e.g. stemming, word-piece tokenization).
+5. Better/built-in embeddings.
+ 1. Implement embedding generation as component of SystemDS.
+ 2. Use LSTM to compose embeddings.
\ No newline at end of file
diff --git a/scripts/staging/entity-resolution/binary-entity-resolution.dml b/scripts/staging/entity-resolution/binary-entity-resolution.dml
new file mode 100644
index 0000000..a8337e8
--- /dev/null
+++ b/scripts/staging/entity-resolution/binary-entity-resolution.dml
@@ -0,0 +1,102 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT PERFORMS AN ENTITY RESOLUTION PIPELINE FOR BINARY MATCHING ON TWO FILES
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# ---------------------------------------------------------------------------------------------
+# FX String --- Location to read the frame of tokens in bow format for the first dataset
+# Each line contains comma separated list of id, token and value
+# FY String --- Location to read the frame of tokens in bow format for the second dataset
+# Each line contains comma separated list of id, token and value
+# OUT String --- Location to save the output of maching pairs
+# Each line contains comma separated ids of one matched pair
+# First column is for the first dataset, while second columns is the id of the second one
+# Third column provides the similarity score
+# threshold Double 0.9 Threshold to be considered as a match
+# num_hashtables Int 6 Number of hashtables for LSH blocking.
+# num_hyperplanes Int 4 Number of hyperplanes for LSH blocking.
+# use_tokens Boolean TRUE Whether to use the tokens of FX and FY to generate predictions
+# use_embeddings Boolean FALSE Whether to use the embeddings of XE and YE to generate predictions
+# XE String --- Location to read the frame of embedding matrix for the first dataset
+# Required if use_embeddings is set to TRUE
+# YE String --- Location to read the frame of embedding matrix for the second dataset
+# Required if use_embeddings is set to TRUE
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: frame of maching pairs
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/postprocessing.dml") as post;
+source("./scripts/staging/entity-resolution/primitives/preprocessing.dml") as pre;
+source("./scripts/staging/entity-resolution/primitives/pipeline.dml") as pipe;
+
+# Command Line Arguments
+fileFX = $FX;
+fileFY = $FY;
+fileOUT = $OUT;
+
+threshold = ifdef($threshold, 0.9);
+num_hashtables = ifdef($num_hashtables, 6);
+num_hyperplanes = ifdef($num_hyperplanes, 4);
+
+use_tokens = ifdef($use_tokens, TRUE);
+use_embeddings = ifdef($use_embeddings, FALSE);
+# file XE and YE is only required if using embeddings
+fileXE = ifdef($XE, "");
+fileYE = ifdef($YE, "");
+
+# Read data
+FX = read(fileFX);
+FY = read(fileFY);
+if (use_embeddings) {
+ if (fileXE == "" | fileYE == "") {
+ print("You need to specify file XE and XY when use_embeddings is set to TRUE");
+ } else {
+ X_embeddings = read(fileXE);
+ Y_embeddings = read(fileYE);
+ }
+}
+
+# Convert data
+[X, Y, M_tokens, MX_ids, MY_ids] = pre::convert_frame_tokens_to_matrix_bow_2(FX,FY);
+if (use_tokens & use_embeddings) {
+ X = cbind(X, X_embeddings);
+ Y = cbind(Y, Y_embeddings);
+} else if (use_tokens) {
+ # Nothing to do in this case, since X already contains tokens
+} else if (use_embeddings) {
+ X = X_embeddings;
+ Y = Y_embeddings;
+} else {
+ print("Either use_tokens or use_embeddings needs to be TRUE, using tokens only as default.");
+}
+# Perform matching
+THRES = pipe::binary_entity_resolution_pipeline_lsh(X, Y, num_hashtables, num_hyperplanes, threshold);
+sparse = post::untable(THRES);
+
+# Write results
+X_dec = transformdecode(target=sparse[,1], meta=MX_ids[,1], spec="{recode:[C1]}");
+Y_dec = transformdecode(target=sparse[,2], meta=MY_ids[,1], spec="{recode:[C1]}");
+output = cbind(cbind(X_dec, Y_dec), as.frame(sparse[,3]));
+write(output, fileOUT, sep=",", sparse=FALSE, format="csv");
diff --git a/scripts/staging/entity-resolution/entity-clustering.dml b/scripts/staging/entity-resolution/entity-clustering.dml
new file mode 100644
index 0000000..eb11df6
--- /dev/null
+++ b/scripts/staging/entity-resolution/entity-clustering.dml
@@ -0,0 +1,117 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT PERFORMS AN ENTITY RESOLUTION PIPELINE FOR CLUSTERING ON A SINGLE FILE
+# CONSISTS OF BLOCKING, MATCHING, AND CLUSTERING
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# ---------------------------------------------------------------------------------------------
+# FX String --- Location to read the frame of tokens in bow format
+# Each line contains comma separated list of id, token and value
+# OUT String --- Location to save the output of maching pairs
+# Each line contains comma separated ids of one matched pair
+# Third column provides the similarity score
+# threshold Double 0.9 Threshold to be considered as a match
+# blocking_method String naive Possible values: ["naive", "lsh"].
+# num_blocks Int 1 Number of blocks for naive blocking
+# num_hashtables Int 6 Number of hashtables for LSH blocking.
+# num_hyperplanes Int 4 Number of hyperplanes for LSH blocking.
+
+# use_tokens Boolean TRUE Whether to use the tokens of FX to generate predictions
+# use_embeddings Boolean FALSE Whether to use the embeddings of XE to generate predictions
+# XE String --- Location to read the frame of embedding matrix
+# Required if use_embeddings is set to TRUE
+# store_mapping Boolean FALSE Whether to store the mapping of transformencode
+# MX String --- Location to write the frame of mapping
+# Required if store_mapping is set to TRUE
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: frame of maching pairs
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/preprocessing.dml") as pre;
+source("./scripts/staging/entity-resolution/primitives/postprocessing.dml") as post;
+source("./scripts/staging/entity-resolution/primitives/pipeline.dml") as pipe;
+
+# Command Line Arguments
+fileFX = $FX;
+fileOUT = $OUT;
+
+threshold = ifdef($threshold, 0.9);
+blocking_method = ifdef($blocking_method, "lsh");
+num_blocks = ifdef($num_blocks, 1);
+num_hyperplanes = ifdef($num_hyperplanes, 4);
+num_hashtables = ifdef($num_hashtables, 6);
+use_tokens = ifdef($use_tokens, TRUE);
+use_embeddings = ifdef($use_embeddings, FALSE);
+# file XE is only required if using embeddings
+fileXE = ifdef($XE, "");
+# mapping file is required for evaluation
+store_mapping = ifdef($store_mapping, FALSE);
+fileMX = ifdef($MX, "");
+
+if (!(blocking_method == "naive" | blocking_method == "lsh")) {
+ print("ERROR: blocking method must be in ['naive', 'lsh']");
+}
+
+# Read data
+FX = read(fileFX);
+if (use_embeddings) {
+ if (fileXE == "")
+ print("You need to specify file XE when use_embeddings is set to TRUE");
+ else
+ X_embeddings = read(fileXE);
+}
+
+# Convert data
+[X, MX] = pre::convert_frame_tokens_to_matrix_bow(FX);
+if (use_tokens & use_embeddings) {
+ X = cbind(X, X_embeddings);
+} else if (use_tokens) {
+ # Nothing to do in this case, since X already contains tokens
+} else if (use_embeddings) {
+ X = X_embeddings;
+} else {
+ print("Either use_tokens or use_embeddings needs to be TRUE, using tokens only as default.");
+}
+
+if (store_mapping) {
+ if (fileMX == "")
+ print("You need to specify file MX when store_mapping is set to TRUE.");
+ else
+ write(MX, fileMX);
+}
+
+# Perform clustering
+if (blocking_method == "naive") {
+ CLUSTER = pipe::entity_clustering_pipeline(X, num_blocks, threshold);
+} else if (blocking_method == "lsh") {
+ CLUSTER = pipe::entity_clustering_pipeline_lsh(X, num_hashtables, num_hyperplanes, threshold);
+}
+MATCH = (CLUSTER > 0);
+
+# Write results
+sparse = post::untable(CLUSTER);
+dec = transformdecode(target=sparse, meta=cbind(MX[,1],MX[,1]), spec="{recode:[C1,C2]}");
+output = cbind(dec, as.frame(sparse[,3]));
+write(output, fileOUT, sep=",", sparse=FALSE, format="csv");
diff --git a/scripts/staging/entity-resolution/eval-entity-resolution.dml b/scripts/staging/entity-resolution/eval-entity-resolution.dml
new file mode 100644
index 0000000..cc48497
--- /dev/null
+++ b/scripts/staging/entity-resolution/eval-entity-resolution.dml
@@ -0,0 +1,59 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+#
+# THIS SCRIPT EVALUATES THE PREDICTIONS OF THE ENTITY RESOLUTION AGAINST THE GROUND TRUTH
+#
+# INPUT PARAMETERS:
+# ---------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# ---------------------------------------------------------------------------------------------
+# FX String --- Location to read the frame of the predictions
+# Each line contains comma separated ids of one matched pair
+# Remainig columns (>2) are ignored
+# FY String --- Location to read the frame of the ground truth
+# Each line contains comma separated ids of one matched pair
+# Remainig columns (>2) are ignored
+# ---------------------------------------------------------------------------------------------
+# OUTPUT: prints different evaluation metrics (accuracy, F1)
+# ---------------------------------------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/evaluation.dml") as eval;
+
+# Command Line Arguments
+fileFX = $FX;
+fileFY = $FY;
+
+use_MX = ifdef($use_MX, FALSE);
+
+# Read data
+FX = read(fileFX);
+FY = read(fileFY);
+
+# Transform the data
+[XY, MX] = transformencode(target=rbind(FX[,1:2],FY[,1:2]), spec="{recode:[C1,C2]}");
+X = XY[1:nrow(FX),];
+Y = XY[nrow(FX)+1:nrow(FX)+nrow(FY),];
+PRED = table(X[,1], X[,2], nrow(MX), nrow(MX));
+GT = table(Y[,1], Y[,2], nrow(MX), nrow(MX));
+
+# Perform the evaluation
+eval::print_eval_stats(PRED, GT);
diff --git a/scripts/staging/entity-resolution/primitives/blocking.dml b/scripts/staging/entity-resolution/primitives/blocking.dml
new file mode 100644
index 0000000..baa9aee
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/blocking.dml
@@ -0,0 +1,292 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Splits the rows of X into num_blocks non-overlapping regions.
+# May produce less than num_blocks regions.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A dataset with rows to split into blocks.
+# num_blocks Integer --- How many blocks to produce.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# BLOCKS Double A column vector with start indices for each block. Has one more row
+# than blocks such that the last row is the end index of the last block.
+# --------------------------------------------------------------------------------------------
+naive_blocking = function(Matrix[Double] X, Integer num_blocks) return (Matrix[Double] BLOCKS) {
+ block_size_flt = nrow(X) / num_blocks;
+ if (block_size_flt < 1.0) {
+ BLOCKS= seq(1, nrow(X) + 1);
+ } else {
+ block_size = ceil(block_size_flt);
+ BLOCKS = rbind(as.matrix(1), 1 + block_size * seq(1, num_blocks));
+ BLOCKS[num_blocks+1,] = nrow(X) + 1
+ }
+}
+
+# Sorts the rows in dataset X by the vector v. The order is defined by the ascending order of v.
+# Can return the vector v as first column of X, depending on parameter prepend_v.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Any matrix with rows to be sorted.
+# v matrix --- A vector with the same number of rows as X. Defines ordering.
+# prepend_v boolean --- Whether to return v as first column of X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_index matrix A vector with the original row number for each row in X.
+# X_sorted matrix The sorted input matrix X.
+# --------------------------------------------------------------------------------------------
+sort_by_vector = function(Matrix[Double] X, Matrix[Double] v, Boolean prepend_v) return (Matrix[Double] X_index, Matrix[Double] X_sorted) {
+ X_index = order(target=v, by=1, decreasing = FALSE, index.return=TRUE);
+ X_sorted = order(target=cbind(v, X), by=1, decreasing = FALSE, index.return=FALSE);
+ if (!prepend_v) {
+ X_sorted = X_sorted[,2:ncol(X_sorted)];
+ }
+}
+
+# Sorts the rows in dataset X by their sum.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Any matrix with rows to be sorted.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_index matrix A vector with the original row number for each row in X.
+# X_sorted matrix The sorted input matrix X.
+# --------------------------------------------------------------------------------------------
+row_sum_sorting = function(Matrix[Double] X) return (Matrix[Double] X_index, Matrix[Double] X_sorted) {
+ X_rowSum = rowSums(X);
+ [X_index, X_sorted] = sort_by_vector(X, X_rowSum, FALSE);
+}
+
+# Sorts the rows in dataset X by their original row numbers (index).
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Any matrix with rows to be sorted.
+# X_index matrix --- The orginal row numbers to be restored.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_reindex matrix The reindexed matrix X.
+# --------------------------------------------------------------------------------------------
+reindex_rowwise = function(Matrix[Double] X, Matrix[Double] X_index) return (Matrix[Double] X_reindex) {
+ X_conc = cbind(X_index, X);
+ X_reindex = order(target=X_conc, by=1, decreasing = FALSE, index.return=FALSE);
+ # Remove index column
+ X_reindex = X_reindex[,2:ncol(X_reindex)];
+}
+
+# Sorts both rows and columns in dataset X by their original row numbers (index).
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Any matrix that should be resorted.
+# X_index matrix --- The orginal row numbers to be restored.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_reindex matrix The reindexed matrix X.
+# --------------------------------------------------------------------------------------------
+reindex_rows_and_cols = function(Matrix[Double] X, Matrix[Double] X_index) return (Matrix[Double] X_reindex) {
+ # First reindex rows
+ X_reindex = X;
+ X_reindex = reindex_rowwise(X_reindex, X_index);
+ # Then transpose and repeat
+ X_reindex = t(X_reindex);
+ X_reindex = reindex_rowwise(X_reindex, X_index);
+}
+
+# Generates a random matrix of hyperplane parameters.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# num_hyperplanes Integer --- How many hyperplanes to generate.
+# dimension Integer --- The number of parameters per hyperplane.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# H matrix A num_hyperplanes x dimension matrix of hyperplane parameters.
+# --------------------------------------------------------------------------------------------
+gen_rand_hyperplanes = function(Integer num_hyperplanes, Integer dimension) return (Matrix[Double] H) {
+ H = rand(rows=num_hyperplanes, cols=dimension, min=-1, max=1);
+}
+
+# Creates a scalar hash code for each row in X_plane by interpreting positive values as a
+# binary number.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X_plane matrix --- The result of a plane equation for each row in X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_hash matrix A row vector of scalar hash codes. Same number of rows as X_plane.
+# --------------------------------------------------------------------------------------------
+aggregate_lsh_code = function(Matrix[Double] X_code) return (Matrix[Double] X_hash) {
+ X_pos = (X_code > 0);
+ pos_weights = 2^seq(ncol(X_code)-1, 0);
+ X_weights = X_pos * t(pos_weights);
+ X_hash = rowSums(X_weights);
+}
+
+# Computes locality-sensitive hash codes for each row in X.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A matrix that should be hashed.
+# H matrix --- Hyperplanes matrix w. shape (num_hyperplanes, ncol(X))
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_hash matrix A column vector with hash codes. Same number of rows as X.
+# --------------------------------------------------------------------------------------------
+lsh = function(Matrix[Double] X, Matrix[Double] H) return (Matrix[Double] X_hash) {
+ X_plane = X %*% t(H);
+ X_hash = aggregate_lsh_code(X_plane);
+}
+
+lsh_find_blocks = function(Matrix[Double] X, Integer dimensions) return (Matrix[Double] BLOCKS){
+ BLOCKS = matrix(0, rows=2^dimensions+1, cols=1)
+ current_hash_value = 0.0;
+ BLOCKS[1,1] = 1
+ for (row_idx in 1:nrow(X)) {
+ this_hash = as.scalar(X[row_idx,1]);
+ if (current_hash_value < this_hash) {
+ for (h in (current_hash_value+2):(this_hash+1)) {
+ BLOCKS[h,1] = row_idx;
+ }
+ current_hash_value = this_hash;
+ }
+ }
+ n_missing_rows = nrow(BLOCKS) - (current_hash_value + 1);
+ BLOCKS[(current_hash_value+2):nrow(BLOCKS),] = matrix(nrow(X) + 1, rows=n_missing_rows, cols=1);
+}
+
+# Blocks the given matrix X by locality sensitive hashing using a set of random hyperplanes.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A matrix that should be blocked by LSH.
+# num_hyperplanes Integer --- How many hyperplanes to use.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_index matrix A vector with the original row number for each row in X.
+# X_hash matrix A column vector with the scalar LSH hash code for each row in X.
+# X_sorted matrix The sorted input matrix X.
+# BLOCKS Double A column vector with start indices for each block. Has one more row
+# than blocks such that the last row is the end index of the last block.
+# --------------------------------------------------------------------------------------------
+lsh_blocking = function(Matrix[Double] X, Integer num_hyperplanes) return (Matrix[Double] X_index, Matrix[Double] X_hash, Matrix[Double] X_sorted, Matrix[Double] BLOCKS) {
+ H = gen_rand_hyperplanes(num_hyperplanes, ncol(X));
+ X_hash = lsh(X, H);
+ [X_index, X_sorted_with_hash] = sort_by_vector(X, X_hash, TRUE);
+ BLOCKS = lsh_find_blocks(X_sorted_with_hash, num_hyperplanes);
+ B_lengths = rbind(BLOCKS[2:nrow(BLOCKS),] - BLOCKS[1:nrow(BLOCKS)-1,], as.matrix(1));
+ BLOCKS = removeEmpty(target=BLOCKS, margin="rows", select=B_lengths)
+ X_sorted = X_sorted_with_hash[,2:ncol(X_sorted_with_hash)];
+ X_hash = X_sorted_with_hash[,1];
+}
+
+# Blocks the matrices X and Y by locality sensitive hashing using a set of random hyperplanes.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A matrix that should be blocked by LSH.
+# Y matrix --- A matrix that should be blocked by LSH.
+# num_hyperplanes Integer --- How many hyperplanes to use.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_index matrix A vector with the original row number for each row in X.
+# X_hash matrix A column vector with the scalar LSH hash code for each row in X.
+# X_sorted matrix The sorted input matrix X.
+# Y_index matrix A vector with the original row number for each row in Y.
+# Y_hash matrix A column vector with the scalar LSH hash code for each row in Y.
+# Y_sorted matrix The sorted input matrix Y.
+# BLOCKS Double A matrix with start indices for each block. Has one more row
+# than blocks such that the last row is the end index of the last block.
+# First column is X, second column Y indices.
+# --------------------------------------------------------------------------------------------
+lsh_blocking2 = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hyperplanes) return (
+ Matrix[Double] X_index, Matrix[Double] X_hash, Matrix[Double] X_sorted,
+ Matrix[Double] Y_index, Matrix[Double] Y_hash, Matrix[Double] Y_sorted,
+ Matrix[Double] BLOCKS) {
+ H = gen_rand_hyperplanes(num_hyperplanes, max(ncol(X), ncol(Y)));
+ X_hash = lsh(X, H[,1:ncol(X)]);
+ Y_hash = lsh(Y, H[,1:ncol(Y)]);
+ [X_index, Xs] = sort_by_vector(X, X_hash, TRUE);
+ [Y_index, Ys] = sort_by_vector(Y, Y_hash, TRUE);
+ X_blocks = lsh_find_blocks(Xs, num_hyperplanes);
+ Y_blocks = lsh_find_blocks(Ys, num_hyperplanes);
+ BLOCKS = cbind(X_blocks, Y_blocks)
+ B_lengths = rbind(BLOCKS[2:nrow(BLOCKS),] - BLOCKS[1:nrow(BLOCKS)-1,], matrix(1, rows=1, cols=2));
+ BLOCKS = removeEmpty(target=BLOCKS, margin="rows", select=B_lengths)
+ X_sorted = Xs[,2:ncol(Xs)];
+ X_hash = Xs[,1];
+ Y_sorted = Ys[,2:ncol(Ys)];
+ Y_hash = Ys[,1];
+}
diff --git a/scripts/staging/entity-resolution/primitives/clustering.dml b/scripts/staging/entity-resolution/primitives/clustering.dml
new file mode 100644
index 0000000..6c36476
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/clustering.dml
@@ -0,0 +1,158 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Recursive depth-first search which in some cases can run into stack overflow errors in the
+# SystemDS runtime. Not recommended, use dfs_iter instead!
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH matrix --- An adjacency matrix.
+# start_vertex Integer --- The vertex to start the search from.
+# marked matrix --- Column-vector which is used to mark already visited
+# vertices. A value of 1 means already visited, 0 means
+# unvisited.
+# id matrix --- Column-vector which the connected component of each vertex
+# is written to.
+# current_component Integer --- The component id which is written to each visited vertex'
+# entry in the id vector.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# id matrix --- Column-vector which the connected component of each vertex is written to.
+# marked matrix --- Column-vector which is used to mark already visited vertices.
+# A value of 1 means already visited, 0 means unvisited.
+# --------------------------------------------------------------------------------------------
+dfs = function(Matrix[Double] GRAPH, Integer vertex, Matrix[Double] marked, Matrix[Double] id, Integer count) return (Matrix[Double] id, Matrix[Double] marked) {
+ marked[vertex,] = 1;
+ id[vertex,] = count;
+ for (col in 1:ncol(GRAPH)) {
+ if (as.scalar(GRAPH[vertex,col])) {
+ if (!as.scalar(marked[col,])) {
+ [id, marked] = dfs(GRAPH, col, marked, id, count);
+ }
+ }
+ }
+}
+
+# Iterative depth-first search with a manually allocated stack in order to avoid stack overflow
+# errors.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH matrix --- An adjacency matrix.
+# start_vertex Integer --- The vertex to start the search from.
+# marked matrix --- Column-vector which is used to mark already visited
+# vertices. A value of 1 means already visited, 0 means
+# unvisited.
+# id matrix --- Column-vector which the connected component of each vertex
+# is written to.
+# current_component Integer --- The component id which is written to each visited vertex'
+# entry in the id vector.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# id matrix --- Column-vector which the connected component of each vertex is
+# written to.
+# marked matrix --- Column-vector which is used to mark already visited vertices.
+# A value of 1 means already visited, 0 means unvisited.
+# --------------------------------------------------------------------------------------------
+dfs_iter = function(Matrix[Double] GRAPH, Integer start_vertex, Matrix[Double] marked, Matrix[Double] id, Integer current_component) return (Matrix[Double] id, Matrix[Double] marked) {
+ nvert = ncol(GRAPH);
+ stack_idx = matrix(0, rows=nvert, cols=1);
+ stack_vid = matrix(0, rows=nvert, cols=1);
+
+ stack_ptr = 1;
+ stack_vid[stack_ptr,] = start_vertex;
+ stack_idx[stack_ptr,] = 1;
+
+ while(stack_ptr > 0) {
+ vertex = as.scalar(stack_vid[stack_ptr,]);
+ neigh_idx = as.scalar(stack_idx[stack_ptr,]);
+ if (neigh_idx == 1) {
+ id[vertex,] = current_component;
+ marked[vertex,] = 1;
+ }
+ # find next not visited neighbour
+ cont = TRUE;
+ while (cont) {
+ if (as.scalar(GRAPH[vertex, neigh_idx]) & !as.scalar(marked[neigh_idx,])) {
+ cont = FALSE;
+ }
+ else {
+ neigh_idx = neigh_idx + 1;
+ if (neigh_idx > nvert) {
+ cont = FALSE;
+ }
+ }
+ }
+ if (neigh_idx > nvert ) {
+ stack_ptr = stack_ptr - 1;
+ }
+ else if (neigh_idx == nvert & as.scalar(marked[neigh_idx,])) {
+ stack_ptr = stack_ptr - 1;
+ }
+ else {
+ stack_ptr = stack_ptr + 1;
+ stack_vid[stack_ptr,] = neigh_idx;
+ stack_idx[stack_ptr,] = 1;
+ }
+ }
+}
+
+# Makes all connected components in the adjacency matrix GRAPH fully-connected.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH matrix --- An adjacency matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# GRAPH matrix --- An adjacency matrix with all connected components fully
+# connected.
+# --------------------------------------------------------------------------------------------
+cluster_by_connected_components = function(Matrix[Double] ADJACENCY) return (Matrix[Double] COMPONENTS) {
+ #TODO could we reuse our components builtin?
+ rows = nrow(ADJACENCY);
+ marked = matrix(0, rows=rows, cols=1);
+ id = matrix(0, rows=nrow(ADJACENCY), cols=1);
+ count = 1;
+ for (s in 1:rows) {
+ if (!as.scalar(marked[s,])) {
+ #[id, marked] = dfs(ADJACENCY, s, marked, id, count);
+ [id, marked] = dfs_iter(ADJACENCY, s, marked, id, count);
+ count = count + 1;
+ }
+ }
+ COMPONENTS = outer(id, t(id), "==");
+ COMPONENTS = COMPONENTS - diag(diag(COMPONENTS));
+}
diff --git a/scripts/staging/entity-resolution/primitives/evaluation.dml b/scripts/staging/entity-resolution/primitives/evaluation.dml
new file mode 100644
index 0000000..9601582
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/evaluation.dml
@@ -0,0 +1,142 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Calculates the fraction of correctly predicted values in PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# PRED matrix --- Predicted values, same shape as GT.
+# GT matrix --- Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# score Double Fraction of correct values. 0 means all incorrect, 1 means all correct.
+# --------------------------------------------------------------------------------------------
+accuracy = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+ HITS = PRED == GT;
+ sum = sum(HITS);
+ total = length(HITS);
+ score = sum / total;
+}
+
+# Calculates the precision of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the fraction of correctly predicted values of the number of predicted values.
+# precision = |true_positives| / |predicted_positives|
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# PRED matrix --- Predicted values, same shape as GT.
+# GT matrix --- Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# score Double The precision score, as described above.
+# --------------------------------------------------------------------------------------------
+precision = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+ tp = sum(PRED * GT);
+ score = tp / sum(PRED);
+}
+
+# Calculates the recall of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the fraction of correctly predicted values of the number of values that should be predicted.
+# recall = |true_positives| / |ground_truth_positives|
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# PRED matrix --- Predicted values, same shape as GT.
+# GT matrix --- Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# score Double The recall score, as described above.
+# --------------------------------------------------------------------------------------------
+recall = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double score) {
+ tp = sum(PRED * GT);
+ score = tp / sum(GT);
+}
+
+geometric_mean2 = function(Double a, Double b) return (Double geometric_mean) {
+ geometric_mean = 2 * (a * b) / (a + b);
+}
+
+# Calculates the F1 score of PRED given a ground truth GT.
+# In both inputs, the value 0 means no prediction.
+#
+# This is the geometric mean of the precision and recall scores.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# PRED matrix --- Predicted values, same shape as GT.
+# GT matrix --- Ground truth values, same shape PRED.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# f1 Double The F1 score, as described above.
+# precision Double The precision score, as described above.
+# recall Double The recall score, as described above.
+# --------------------------------------------------------------------------------------------
+f1 = function(Matrix[Double] PRED, Matrix[Double] GT) return (Double f1, Double precision, Double recall) {
+ precision = precision(PRED, GT);
+ recall = recall(PRED, GT);
+ f1 = geometric_mean2(precision, recall);
+}
+
+# Calculates evaluation scores for PRED given a ground truth GT and prints them.
+# In both inputs, the value 0 means no prediction.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# PRED matrix --- Predicted values, same shape as GT.
+# GT matrix --- Ground truth values, same shape PRED.
+print_eval_stats = function(Matrix[Double] PRED, Matrix[Double] GT) {
+ acc = accuracy(PRED, GT);
+ [f1, precision, recall] = f1(PRED, GT);
+ print("Evaluation statistics:");
+ print(" PRED_nnz : %d", as.integer(sum(PRED != 0.0)));
+ print(" GT_nnz : %d", as.integer(sum(GT != 0.0)));
+ print(" Accuracy : %6.4f", acc);
+ print(" Precision: %6.4f", precision);
+ print(" Recall : %6.4f", recall);
+ print(" F1 score : %6.4f", f1);
+}
diff --git a/scripts/staging/entity-resolution/primitives/matching.dml b/scripts/staging/entity-resolution/primitives/matching.dml
new file mode 100644
index 0000000..5330bb8
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/matching.dml
@@ -0,0 +1,108 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Calculates the cosine similarity between rows of the given matrix X.
+# WARNING: Division by zero will occur for zero rows.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Numeric matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# score matrix Symmetric matrix of cosine similarity between rows of X.
+# --------------------------------------------------------------------------------------------
+cosine = function(Matrix[Double] X) return (Matrix[Double] COS) {
+ Row_norm = sqrt(rowSums(X ^ 2));
+ NORM = Row_norm %*% t(Row_norm);
+ DOT = X %*% t(X);
+ COS = DOT / NORM;
+}
+
+# Calculates the cosine similarity between rows of X and rows of Y.
+# Uses an epsilon value for the euclidean norm of zero rows in order to avoid division by zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Numeric matrix. ncols must match Y.
+# Y matrix --- Numeric matrix. ncols must match X.
+# epsilon double --- The value to replace the norm of zero rows with.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# COS matrix nrows(X) by nrows(Y) matrix of cosine similarity between rows of X and Y.
+# --------------------------------------------------------------------------------------------
+cosine2_eps = function(Matrix[Double] X, Matrix[Double] Y, Double epsilon) return (Matrix[Double] COS) {
+ X_row_norm = sqrt(rowSums(X ^ 2));
+ X_row_norm = X_row_norm + (X_row_norm < epsilon) * epsilon
+ Y_row_norm = sqrt(rowSums(Y ^ 2));
+ Y_row_norm = Y_row_norm + (Y_row_norm < epsilon) * epsilon
+ NORM = X_row_norm %*% t(Y_row_norm);
+ DOT = X %*% t(Y);
+ COS = DOT / NORM;
+}
+
+# Calculates the cosine similarity between rows of X and rows of Y.
+# Uses an epsilon value for the euclidean norm of zero rows in order to avoid division by zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Numeric matrix. ncols must match Y.
+# Y matrix --- Numeric matrix. ncols must match X.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# COS matrix nrows(X) by nrows(Y) matrix of cosine similarity between rows of X and Y.
+# --------------------------------------------------------------------------------------------
+cosine2 = function(Matrix[Double] X, Matrix[Double] Y) return (Matrix[Double] COS) {
+ COS = cosine2_eps(X, Y, 1e-6)
+}
+
+# Sets values of the given matrix that are below the given threshold to zero.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- Numeric matrix.
+# threshold double --- Values in X below this value are set to 0.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# THRES matrix X with values below threshold set to 0.
+# --------------------------------------------------------------------------------------------
+tresholding = function(Matrix[Double] X, Double threshold) return (Matrix[Double] THRES) {
+ THRES = (X > threshold) * X;
+}
diff --git a/scripts/staging/entity-resolution/primitives/pipeline.dml b/scripts/staging/entity-resolution/primitives/pipeline.dml
new file mode 100644
index 0000000..e6af155
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/pipeline.dml
@@ -0,0 +1,220 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+source("./scripts/staging/entity-resolution/primitives/blocking.dml") as block;
+source("./scripts/staging/entity-resolution/primitives/matching.dml") as match;
+source("./scripts/staging/entity-resolution/primitives/clustering.dml") as cluster;
+
+
+# Very simple entity clustering pipeline which should work relatively well for small datasets.
+#
+# Blocks the input dataset X into num_blocks non-overlapping regions, after sorting the dataset
+# by the sum of its rows. This is a very simple blocking scheme which serves mainly as a baseline
+# example and will result in non-optimal performance. However, if no blocking is needed, this
+# can be used with num_blocks=1.
+#
+# Uses a threshold for similarity to link entities and clusters them by also connecting all
+# entities in each connected compontent ('makes each connected component fully connected').
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A dataset for which duplicates should be found.
+# num_blocks Integer --- How many blocks to produce.
+# threshold Double --- Similarity threshold which is used to decide if two entities
+# are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates.
+# --------------------------------------------------------------------------------------------
+entity_clustering_pipeline = function(Matrix[Double] X, Integer num_blocks, Double threshold) return (Matrix[Double] X_cluster) {
+ # First sort the matrix
+ [X_index, X_sorted] = block::row_sum_sorting(X);
+ X = X_sorted;
+ # Perform blocking: match and cluster each block
+ blocks = block::naive_blocking(X_sorted, num_blocks);
+ X_cluster = matrix(0, nrow(X_sorted), nrow(X_sorted));
+ # system ds raises false positives for dependency check, but chunks of matrix are indepedantly addresses
+ # supress error with check = 0
+ parfor (i in 1:nrow(blocks)-1, check = 0) {
+ block_start = as.scalar(blocks[i,]);
+ block_end = as.scalar(blocks[i+1,])-1;
+ X_block = X_sorted[block_start:block_end,];
+ X_sim = match::cosine(X_block);
+ X_thres = match::tresholding(X_sim, threshold);
+ X_match = (X_sim > threshold);
+ X_comp = cluster::cluster_by_connected_components(X_match);
+ X_cluster[block_start:block_end,block_start:block_end] = X_comp * X_sim;
+ }
+ # Reindex back the symmetrical matrix
+ X_cluster = block::reindex_rows_and_cols(X_cluster, X_index);
+}
+
+# Entity clustering pipeline using locality-sensitive hashing as a blocking algorithm to improve
+# runtime on large datasets. The tradeoff between accuracy and performance can be configured
+# via the num_hashtables and num_hyperplanes parameters.
+#
+# For more details on LSH, see:
+# Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+# Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+#
+# Uses a threshold for similarity to link entities and clusters them by also connecting all
+# entities in each connected compontent ('makes each connected component fully connected').
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A dataset for which duplicates should be found.
+# num_hashtables Integer --- How often to block the dataset using random hyperplanes and
+# compute similarities.
+# Increases runtime and improves accuracy.
+# num_hyperplanes Integer --- The dimensionality of the random hyperplanes.
+# Higher values produce smaller blocks and require a higher
+# number of num_hashtables to avoid losing accuracy.
+# threshold Double --- Similarity threshold which is used to decide if two entities
+# are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X_cluster matrix A symmetrical adjacency matrix for X defining the found duplicates.
+# --------------------------------------------------------------------------------------------
+entity_clustering_pipeline_lsh = function(Matrix[Double] X, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] X_cluster) {
+ X_cluster = matrix(0, nrow(X), nrow(X));
+ # First get the LSH blocks
+ for (hashtable in 1:num_hashtables, check = 0) {
+ [X_index, X_hash, X_sorted, blocks] = block::lsh_blocking(X, num_hyperplanes);
+ X_cluster_local = matrix(0, nrow(X), nrow(X));
+ ## Perform blocking: match and cluster each block
+ parfor (i in 1:nrow(blocks)-1, check = 0) {
+ block_start = as.scalar(blocks[i,]);
+ block_end = as.scalar(blocks[i+1,])-1;
+ # Only apply to existing blocks
+ X_block = X_sorted[block_start:block_end,];
+ X_sim = match::cosine(X_block);
+ X_thres = match::tresholding(X_sim, threshold);
+ X_match = (X_sim > threshold);
+ X_comp = cluster::cluster_by_connected_components(X_match);
+ X_new_block = X_cluster_local[block_start:block_end,block_start:block_end] | (X_comp * X_sim);
+ # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
+ if (sum(X_new_block) > 0) {
+ X_cluster_local[block_start:block_end,block_start:block_end] = X_new_block;
+ }
+ }
+ ## Reindex back the symmetrical matrix
+ X_cluster = X_cluster | block::reindex_rows_and_cols(X_cluster_local, X_index);
+ }
+}
+
+# Very simple binary entity resolution pipeline which computes similarity between rows of two
+# datasets. This mainly serves as a baseline example and does not use blocking. It is not
+# suitable for large datasets.
+#
+# Uses a threshold for similarity to link entities.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A dataset to be compared with Y.
+# Y matrix --- A dataset to be compared with X.
+# threshold Double --- Similarity threshold which is used to decide if two
+# entities are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y.
+# Shape is (nrow(X), nrow(Y)).
+# --------------------------------------------------------------------------------------------
+binary_entity_resolution_pipeline = function(Matrix[Double] X, Matrix[Double] Y, Double threshold) return (Matrix[Double] XY_pairs) {
+ XY_sim = match::cosine2(X, Y);
+ XY_pairs = match::tresholding(XY_sim, threshold);
+}
+
+# Binary entity resolution pipeline using locality-sensitive hashing as a blocking algorithm
+# to improve runtime on large datasets.
+#
+# The tradeoff between accuracy and performance can be configured
+# via the num_hashtables and num_hyperplanes parameters.
+#
+# For more details on LSH, see:
+# Ebraheem, Muhammad, et al. "Distributed representations of tuples for entity resolution."
+# Proceedings of the VLDB Endowment 11.11 (2018): 1454-1467.
+#
+# Uses a threshold for similarity to link entities.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A dataset to be compared with Y.
+# Y matrix --- A dataset to be compared with X.
+# num_hashtables Integer --- How often to block the dataset using random hyperplanes and
+# compute similarities.
+# Increases runtime and improves accuracy.
+# num_hyperplanes Integer --- The dimensionality of the random hyperplanes.
+# Higher values produce smaller blocks and require a higher
+# number of num_hashtables to avoid losing accuracy.
+# threshold Double --- Similarity threshold which is used to decide if two
+# entities are duplicates.
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# XY_pairs matrix An adjacency matrix defining the found duplicates between X and Y.
+# Shape is (nrow(X), nrow(Y)).
+# --------------------------------------------------------------------------------------------
+binary_entity_resolution_pipeline_lsh = function(Matrix[Double] X, Matrix[Double] Y, Integer num_hashtables, Integer num_hyperplanes, Double threshold) return (Matrix[Double] XY_pairs) {
+ XY_pairs = matrix(0, nrow(X), nrow(Y));
+ for (hashtable in 1:num_hashtables, check = 0) {
+ [X_index, X_hash, X_sorted, Y_index, Y_hash, Y_sorted, blocks] = block::lsh_blocking2(X, Y, num_hyperplanes);
+ XY_pairs_local = matrix(0, nrow(X), nrow(Y));
+ parfor (i in 1:nrow(blocks)-1, check = 0) {
+ block_start = blocks[i,];
+ block_end = blocks[i+1,];
+ x_start = as.scalar(block_start[1,1]);
+ x_end = as.scalar(block_end[1,1]) - 1;
+
+ y_start = as.scalar(block_start[1,2]);
+ y_end = as.scalar(block_end[1,2]) - 1;
+ if ((x_end - x_start + 1) > 0 & (y_end - y_start + 1) > 0) {
+ X_block = X_sorted[x_start:x_end,];
+ Y_block = Y_sorted[y_start:y_end,];
+
+ XY_sim = match::cosine2(X_block, Y_block);
+ XY_pairs_block = match::tresholding(XY_sim, threshold);
+ # Workaround for a bug where assigning a sparse matrix multiple times leads to an error in the SystemDS runtime.
+ if (sum(XY_pairs_block) > 0) {
+ XY_pairs_local[x_start:x_end,y_start:y_end] = XY_pairs_block;
+ }
+ }
+ }
+ XY_pairs_local = block::reindex_rowwise(XY_pairs_local, X_index);
+ XY_pairs_local = t(XY_pairs_local);
+ XY_pairs_local = block::reindex_rowwise(XY_pairs_local, Y_index);
+ XY_pairs = XY_pairs | t(XY_pairs_local);
+ }
+}
diff --git a/scripts/staging/entity-resolution/primitives/postprocessing.dml b/scripts/staging/entity-resolution/primitives/postprocessing.dml
new file mode 100644
index 0000000..2110dfa
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/postprocessing.dml
@@ -0,0 +1,80 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Inverse operation of table. Converts a contingency table/adjacency matrix to a list of tuples
+# in the form (id1, id2, value) representing edges in the case of an adjacency matrix.
+#
+# The offsets can be used in order to restore the original ids in case the input to table was
+# concatenated from two datasets.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A contingency table/adjacency matrix.
+# row_offset Integer --- The number of rows of the first concatenated dataset.
+# Use 0 for the default case.
+# col_offset Integer --- The number of columns of the first concatenated dataset.
+# Use 0 for the default case.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# SPARSE matrix --- List of tuples in the form (id1, id2, value).
+# --------------------------------------------------------------------------------------------
+untable_offset = function(Matrix[Double] X, Integer row_offset, Integer col_offset) return (Matrix[Double] SPARSE) {
+ n_row = nrow(X);
+ n_col = ncol(X);
+ n = n_row * n_col;
+
+ mat1 = seq(1+row_offset,n_row+row_offset) %*% matrix(1, rows=1, cols=n_col);
+ col1 = matrix(mat1, rows=n, cols=1, byrow=TRUE);
+ # col1 is a vector of indices repeated like 111 222 333 ...
+
+ mat2 = seq(1+col_offset,n_col+col_offset) %*% matrix(1, rows=1, cols=n_row);
+ col2 = matrix(mat2, rows=n, cols=1, byrow=FALSE);
+ # col2 is a vector of indices repeated like 123 123 123 ...
+
+ col3 = matrix(X, rows=n, cols=1, byrow=TRUE);
+ dense = cbind(col1, col2, col3);
+
+ SPARSE = removeEmpty(target=dense, margin="rows", select=dense[,3]);
+}
+
+# Inverse operation of table. Converts a contingency table/adjacency matrix to a list of tuples
+# in the form (id1, id2, value) representing edges in the case of an adjacency matrix.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A contingency table/adjacency matrix.
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# SPARSE matrix --- List of tuples in the form (id1, id2, value).
+# --------------------------------------------------------------------------------------------
+untable = function(Matrix[Double] X) return (Matrix[Double] SPARSE) {
+ SPARSE = untable_offset(X, 0, 0);
+}
\ No newline at end of file
diff --git a/scripts/staging/entity-resolution/primitives/preprocessing.dml b/scripts/staging/entity-resolution/primitives/preprocessing.dml
new file mode 100644
index 0000000..58c8431
--- /dev/null
+++ b/scripts/staging/entity-resolution/primitives/preprocessing.dml
@@ -0,0 +1,85 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Converts a dataframe with form (id, token, weight) into a contingency table bag-of-words
+# representation.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# FX frame --- A dataframe with form (id, token, weight).
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- A contingency table. Shape is (num_unique_ids, num_unique_tokens).
+# MX frame --- The recoding meta-information for ids and tokens that is needed
+# to convert indices in the X matrix back to their original
+# id/token.
+# --------------------------------------------------------------------------------------------
+convert_frame_tokens_to_matrix_bow = function(Frame[Unknown] FX) return (Matrix[Double] X, Frame[String] MX) {
+ jspecx = "{recode:[C1,C2]}";
+ [X0, MX] = transformencode(target=FX, spec=jspecx);
+ X = table(X0[,1], X0[,2], X0[,3]);
+}
+
+# Converts two dataframes with form (id, token, weight) into contingency table bag-of-words
+# representations. Makes sure both contingency tables are using the same vocabulary.
+#
+# INPUT PARAMETERS:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE DEFAULT MEANING
+# --------------------------------------------------------------------------------------------
+# FX frame --- A dataframe with form (id, token, weight).
+# FY frame --- A dataframe with form (id, token, weight).
+#
+# Output:
+# --------------------------------------------------------------------------------------------
+# NAME TYPE MEANING
+# --------------------------------------------------------------------------------------------
+# X matrix --- The contingency table for FX.
+# Shape is (X_num_unique_ids, XY_num_unique_tokens).
+# Uses same token order and encoding as Y.
+# Y matrix --- The contingency table for FY.
+# Shape is (Y_num_unique_ids, XY_num_unique_tokens).
+# Uses same token order and encoding as X.
+# M_tokens frame --- The recoding meta-information for tokens that is needed
+# to convert column indices in the contingency tables back
+# to their token strings.
+# MX_ids frame --- The recoding meta-information for X ids that is needed
+# to convert row indices in X back to ids for FX.
+# MY_ids frame --- The recoding meta-information for Y ids that is needed
+# to convert row indices in Y back to ids for FY.
+# --------------------------------------------------------------------------------------------
+convert_frame_tokens_to_matrix_bow_2 = function(Frame[Unknown] FX, Frame[Unknown] FY) return (Matrix[Double] X, Matrix[Double] Y, Frame[String] M_tokens, Frame[String] MX_ids, Frame[String] MY_ids) {
+ [E_tokens, M_tokens] = transformencode(target=rbind(FX[,2], FY[,2]), spec="{recode:[C1]}");
+ [Y_ids, MY_ids] = transformencode(target=FY[,1], spec="{recode:[C1]}");
+ [X_ids, MX_ids] = transformencode(target=FX[,1], spec="{recode:[C1]}");
+
+ X_tokens = E_tokens[1:nrow(FX),];
+ Y_tokens = E_tokens[nrow(FX):nrow(E_tokens),];
+
+ ncols = max(max(X_tokens), max(Y_tokens));
+ X = table(X_ids[,1], X_tokens[,1], as.matrix(FX[,3]), nrow(X_ids), ncols);
+ Y = table(Y_ids[,1], Y_tokens[,1], as.matrix(FY[,3]), nrow(Y_ids), ncols);
+}
diff --git a/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java b/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
index 8b3798d..c36d53f 100644
--- a/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
+++ b/src/main/java/org/apache/sysds/hops/recompile/LiteralReplacement.java
@@ -40,6 +40,7 @@ import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.common.Types.OpOpN;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
+import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.cp.Data;
@@ -310,7 +311,7 @@ public class LiteralReplacement
if( data instanceof DataOp && vars.keySet().contains(data.getName())
&& isIntValueDataLiteral(rl, vars) && isIntValueDataLiteral(ru, vars)
- && isIntValueDataLiteral(cl, vars) && isIntValueDataLiteral(cu, vars) )
+ && isIntValueDataLiteral(cl, vars) && isIntValueDataLiteral(cu, vars) )
{
long rlval = getIntValueDataLiteral(rl, vars);
long ruval = getIntValueDataLiteral(ru, vars);
@@ -435,10 +436,10 @@ public class LiteralReplacement
private static boolean isIntValueDataLiteral(Hop h, LocalVariableMap vars)
{
- return ( (h instanceof DataOp && vars.keySet().contains(h.getName()))
- || h instanceof LiteralOp
- ||(h instanceof UnaryOp && (((UnaryOp)h).getOp()==OpOp1.NROW || ((UnaryOp)h).getOp()==OpOp1.NCOL)
- && h.getInput().get(0) instanceof DataOp && vars.keySet().contains(h.getInput().get(0).getName())) );
+ return ( (h instanceof DataOp && vars.keySet().contains(h.getName()))
+ || h instanceof LiteralOp
+ ||(h instanceof UnaryOp && (((UnaryOp)h).getOp()==OpOp1.NROW || ((UnaryOp)h).getOp()==OpOp1.NCOL)
+ && h.getInput().get(0) instanceof DataOp && vars.keySet().contains(h.getInput().get(0).getName())) );
}
private static long getIntValueDataLiteral(Hop hop, LocalVariableMap vars)
@@ -447,26 +448,22 @@ public class LiteralReplacement
try
{
- if( hop instanceof LiteralOp )
- {
+ if( hop instanceof LiteralOp ) {
value = HopRewriteUtils.getIntValue((LiteralOp)hop);
}
- else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NROW )
- {
+ else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NROW ) {
//get the dimension information from the matrix object because the hop
//dimensions might not have been updated during recompile
- MatrixObject mo = (MatrixObject)vars.get(hop.getInput().get(0).getName());
+ CacheableData<?> mo = (CacheableData<?>)vars.get(hop.getInput().get(0).getName());
value = mo.getNumRows();
}
- else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NCOL )
- {
+ else if( hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==OpOp1.NCOL ) {
//get the dimension information from the matrix object because the hop
//dimensions might not have been updated during recompile
- MatrixObject mo = (MatrixObject)vars.get(hop.getInput().get(0).getName());
+ CacheableData<?> mo = (CacheableData<?>)vars.get(hop.getInput().get(0).getName());
value = mo.getNumColumns();
}
- else
- {
+ else {
ScalarObject sdat = (ScalarObject) vars.get(hop.getName());
value = sdat.getLongValue();
}
@@ -522,5 +519,4 @@ public class LiteralReplacement
return val;
}
-
}
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index c87490a..602393e 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -335,6 +335,14 @@ public abstract class CacheableData<T extends CacheBlock> extends Data
return _metaData.getDataCharacteristics();
}
+ public long getNumRows() {
+ return getDataCharacteristics().getRows();
+ }
+
+ public long getNumColumns() {
+ return getDataCharacteristics().getCols();
+ }
+
public abstract void refreshMetaData();
/**
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index f850e73..7509c02 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -177,14 +177,6 @@ public class MatrixObject extends CacheableData<MatrixBlock>
mc.setNonZeros( _data.getNonZeros() );
}
- public long getNumRows() {
- return getDataCharacteristics().getRows();
- }
-
- public long getNumColumns() {
- return getDataCharacteristics().getCols();
- }
-
public long getBlocksize() {
return getDataCharacteristics().getBlocksize();
}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java
new file mode 100644
index 0000000..85a6d09
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBinaryTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+public class EntityResolutionBinaryTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "EntityResolutionBinary";
+ private final static String TEST_DIR = "applications/entity_resolution/binary/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_DIR, TEST_NAME);
+ }
+
+ @Test
+ public void testParams1() {
+ testScriptEndToEnd(1, 1);
+ }
+ @Test
+ public void testParams2() {
+ testScriptEndToEnd(1, 3);
+ }
+ @Test
+ public void testParams3() {
+ testScriptEndToEnd(3, 1);
+ }
+ @Test
+ public void testParams4() {
+ testScriptEndToEnd(5, 5);
+ }
+
+ public void testScriptEndToEnd(int numLshHashtables, int numLshHyperplanes) {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+ fullDMLScriptName = "./scripts/staging/entity-resolution/binary-entity-resolution.dml";;
+
+ programArgs = new String[]{
+ "-nvargs", //
+ "FX=" + sourceDirectory + "input.csv", //
+ "FY=" + sourceDirectory + "input.csv", //
+ "OUT=" + output("B"), //
+ "num_hashtables=" + numLshHashtables,
+ "num_hyperplanes=" + numLshHyperplanes,
+ };
+
+ runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+ // LSH is not deterministic, so in this test we just assert that it runs and produces a file
+ Assert.assertTrue(Files.exists(Paths.get(output("B"))));
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java
new file mode 100644
index 0000000..36bf0e5
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionBlockingTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Test;
+
+public class EntityResolutionBlockingTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "EntityResolutionBlocking";
+ private final static String TEST_DIR = "applications/entity_resolution/blocking/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_DIR, "blocking_naive", new String[]{"B"}));
+ }
+
+ @Test
+ public void testNaive1() {
+ testNaiveBlocking(
+ new double[][]{{0,},},
+ 10,
+ new double[][]{{1,},{2,},}
+ );
+ }
+ @Test
+ public void testNaive2() {
+ testNaiveBlocking(
+ new double[][]{{0,},{1,},},
+ 1,
+ new double[][]{{1,},{3,},}
+ );
+ }
+ @Test
+ public void testNaive3() {
+ testNaiveBlocking(
+ new double[][]{{0,},{1,},},
+ 2,
+ new double[][]{{1,},{2,},{3,},});
+ }
+ @Test
+ public void testNaive4() {
+ testNaiveBlocking(
+ new double[][]{{0,},{1,},{2,},},
+ 2,
+ new double[][]{{1,}, {3,},{4,},});
+ }
+
+ public void testNaiveBlocking(double[][] dataset, int targetNumBlocks, double[][] expectedBlockingIndices) {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+ fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml";
+
+ programArgs = new String[]{
+ "-nvargs", //
+ "inFile=" + input("A"), //
+ "outFile=" + output("B"), //
+ "targetNumBlocks=" + targetNumBlocks
+ };
+ writeInputMatrixWithMTD("A", dataset, false);
+ writeExpectedMatrix("B", expectedBlockingIndices);
+ runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+ compareResults();
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java
new file mode 100644
index 0000000..daf751c
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionClusteringTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class EntityResolutionClusteringTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "EntityResolutionClustering";
+ private final static String TEST_DIR = "applications/entity_resolution/clustering/";
+
+ enum BlockingMethod {
+ NAIVE,
+ LSH,
+ }
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_DIR, TEST_NAME);
+ }
+
+ @Test
+ public void testNaive1() throws IOException {
+ testScriptEndToEnd(0.3, 1, BlockingMethod.NAIVE, 0, 0);
+ }
+ @Test
+ public void testLSH1() throws IOException {
+ testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 1, 1);
+ }
+ @Test
+ public void testLSH2() throws IOException {
+ testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 1, 3);
+ }
+ @Test
+ public void testLSH3() throws IOException {
+ testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 3, 1);
+ }
+ @Test
+ public void testLSH4() throws IOException {
+ testScriptEndToEnd(0.3, 1, BlockingMethod.LSH, 5, 5);
+ }
+
+ public void testScriptEndToEnd(double threshold, int numBlocks, BlockingMethod blockingMethod, int numLshHashtables, int numLshHyperplanes) throws IOException {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+ fullDMLScriptName = "./scripts/staging/entity-resolution/entity-clustering.dml";
+
+ programArgs = new String[]{
+ "-nvargs", //
+ "FX=" + sourceDirectory + "input.csv", //
+ "OUT=" + output("B"), //
+ "threshold=" + threshold,
+ "num_blocks=" + numBlocks,
+ "blocking_method=" + (blockingMethod == BlockingMethod.LSH ? "lsh" : "naive"),
+ "num_hashtables=" + numLshHashtables,
+ "num_hyperplanes=" + numLshHyperplanes,
+ };
+
+ runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+
+ // LSH is not deterministic, so in this test we just assert that it runs and produces a file
+ if (blockingMethod == BlockingMethod.LSH) {
+ Assert.assertTrue(Files.exists(Paths.get(output("B"))));
+ return;
+ }
+
+ Files.copy(Paths.get(sourceDirectory + "expected.csv"), Paths.get(output("expected.csv")), StandardCopyOption.REPLACE_EXISTING);
+ Files.copy(Paths.get(sourceDirectory + "expected.csv.mtd"), Paths.get(output("expected.csv.mtd")), StandardCopyOption.REPLACE_EXISTING);
+
+ FrameBlock expectedPairs = readDMLFrameFromHDFS("expected.csv", Types.FileFormat.CSV);
+ FrameBlock predictedPairs = readDMLFrameFromHDFS("B", Types.FileFormat.CSV);
+
+
+ Iterator<Object[]> expectedIter = expectedPairs.getObjectRowIterator();
+ Iterator<Object[]> predictedIter = predictedPairs.getObjectRowIterator();
+
+ int row = 0;
+ while (expectedIter.hasNext()) {
+ Assert.assertTrue(predictedIter.hasNext());
+ Object[] expected = Arrays.copyOfRange(expectedIter.next(), 0, 2);
+ Object[] predicted = Arrays.copyOfRange(predictedIter.next(), 0, 2);
+ Assert.assertArrayEquals("Row " + row + " differs.", expected, predicted);
+ row++;
+ }
+ Assert.assertEquals(expectedPairs.getNumRows(), predictedPairs.getNumRows());
+ tearDown();
+ }
+}
diff --git a/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java b/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java
new file mode 100644
index 0000000..44fee8b
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/applications/EntityResolutionConnectedComponentsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sysds.test.applications;
+
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.junit.Test;
+
+public class EntityResolutionConnectedComponentsTest extends AutomatedTestBase {
+ private final static String TEST_NAME = "EntityResolutionConnectedComponents";
+ private final static String TEST_DIR = "applications/entity_resolution/connected_components/";
+
+ @Override
+ public void setUp() {
+ addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_DIR, "cluster_by_connected_components", new String[]{"B"}));
+ }
+
+ @Test
+ public void testConnectedComponents1() {
+ testClusterByConnectedComponent(
+ new double[][]{{0,},},
+ new double[][]{{0,}}
+ );
+ }
+ @Test
+ public void testConnectedComponents2() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 0},
+ {0, 0},
+ },
+ new double[][]{
+ {0, 0},
+ {0, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents3() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 1},
+ {1, 0},
+ },
+ new double[][]{
+ {0, 1},
+ {1, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents4() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 1, 0},
+ {1, 0, 1},
+ {0, 1, 0},
+ },
+ new double[][]{
+ {0, 1, 1},
+ {1, 0, 1},
+ {1, 1, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents5() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 0, 1, 0, 0, 0},
+ {0, 0, 0, 1, 0, 0},
+ {1, 0, 0, 0, 1, 0},
+ {0, 1, 0, 0, 0, 0},
+ {0, 0, 1, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0},
+ },
+ new double[][]{
+ {0, 0, 1, 0, 1, 0},
+ {0, 0, 0, 1, 0, 0},
+ {1, 0, 0, 0, 1, 0},
+ {0, 1, 0, 0, 0, 0},
+ {1, 0, 1, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents6() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ },
+ new double[][]{
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 0, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents7() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 1, 0, 1, 0, 0, 0},
+ {1, 0, 1, 1, 0, 0, 0},
+ {0, 1, 0, 0, 0, 0, 0},
+ {1, 1, 0, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 1, 1},
+ {0, 0, 0, 0, 1, 0, 1},
+ {0, 0, 0, 0, 1, 1, 0},
+ },
+ new double[][]{
+ {0, 1, 1, 1, 0, 0, 0},
+ {1, 0, 1, 1, 0, 0, 0},
+ {1, 1, 0, 1, 0, 0, 0},
+ {1, 1, 1, 0, 0, 0, 0},
+ {0, 0, 0, 0, 0, 1, 1},
+ {0, 0, 0, 0, 1, 0, 1},
+ {0, 0, 0, 0, 1, 1, 0},
+ }
+ );
+ }
+ @Test
+ public void testConnectedComponents8() {
+ testClusterByConnectedComponent(
+ new double[][]{
+ {0, 1, 1, 1, 1, 1, 1},
+ {1, 0, 1, 1, 1, 1, 1},
+ {1, 1, 0, 1, 1, 1, 1},
+ {1, 1, 1, 0, 1, 1, 1},
+ {1, 1, 1, 1, 0, 1, 1},
+ {1, 1, 1, 1, 1, 0, 1},
+ {1, 1, 1, 1, 1, 1, 0},
+ },
+ new double[][]{
+ {0, 1, 1, 1, 1, 1, 1},
+ {1, 0, 1, 1, 1, 1, 1},
+ {1, 1, 0, 1, 1, 1, 1},
+ {1, 1, 1, 0, 1, 1, 1},
+ {1, 1, 1, 1, 0, 1, 1},
+ {1, 1, 1, 1, 1, 0, 1},
+ {1, 1, 1, 1, 1, 1, 0},
+ }
+ );
+ }
+
+ public void testClusterByConnectedComponent(double[][] adjacencyMatrix, double[][] expectedMatrix) {
+ TestConfiguration config = getTestConfiguration(TEST_NAME);
+ loadTestConfiguration(config);
+ fullDMLScriptName = SCRIPT_DIR + TEST_DIR + config.getTestScript() + ".dml";
+ programArgs = new String[]{"-nvargs",
+ "inFile=" + input("A"), "outFile=" + output("B")};
+ writeInputMatrixWithMTD("A", adjacencyMatrix, false);
+ writeExpectedMatrix("B", expectedMatrix);
+ runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+ compareResults(0.01);
+ }
+}
diff --git a/src/test/scripts/applications/entity_resolution/binary/expected.csv b/src/test/scripts/applications/entity_resolution/binary/expected.csv
new file mode 100644
index 0000000..fe9af4f
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/expected.csv
@@ -0,0 +1,6 @@
+itemA,itemC,1.0
+itemA,itemD,1.0
+itemC,itemA,1.0
+itemC,itemD,1.0
+itemD,itemA,1.0
+itemD,itemC,1.0
diff --git a/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd b/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd
new file mode 100644
index 0000000..fc4fde8
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/expected.csv.mtd
@@ -0,0 +1,7 @@
+{
+ "data_type": "frame",
+ "schema": "STRING,STRING,FP64,",
+ "rows": 6,
+ "cols": 3,
+ "format": "csv"
+}
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/binary/input.csv b/src/test/scripts/applications/entity_resolution/binary/input.csv
new file mode 100644
index 0000000..51d4bc9
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/input.csv
@@ -0,0 +1,7 @@
+itemA,tokenA,1
+itemA,tokenC,1
+itemB,tokenB,1
+itemC,tokenC,1
+itemC,tokenD,2
+itemD,tokenA,1
+itemD,tokenD,2
diff --git a/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd b/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd
new file mode 100644
index 0000000..1a72ed3
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/binary/input.csv.mtd
@@ -0,0 +1,7 @@
+{
+ "data_type": "frame",
+ "rows": 7,
+ "cols": 3,
+ "format": "csv",
+ "header": false
+}
\ No newline at end of file
diff --git a/.github/workflows/applicationTests.yml b/src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
similarity index 60%
copy from .github/workflows/applicationTests.yml
copy to src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
index 652b31a..642561a 100644
--- a/.github/workflows/applicationTests.yml
+++ b/src/test/scripts/applications/entity_resolution/blocking/blocking_naive.dml
@@ -19,31 +19,8 @@
#
#-------------------------------------------------------------
-name: Application Test
+source("scripts/staging/entity-resolution/primitives/blocking.dml") as blocking;
-on:
- push:
- branches:
- - master
- pull_request:
- branches:
- - master
-
-jobs:
- applicationsTests:
- runs-on: ${{ matrix.os }}
- strategy:
- fail-fast: false
- matrix:
- tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
- os: [ubuntu-latest]
- name: Ap Test ${{ matrix.tests }}
- steps:
- - name: Checkout Repository
- uses: actions/checkout@v2
-
- - name: Run all tests starting with "${{ matrix.tests }}"
- uses: ./.github/action/
- id: test
- with:
- test-to-run: org.apache.sysds.test.applications.${{ matrix.tests }}**
+A = read($inFile);
+B = blocking::naive_blocking(A, $targetNumBlocks);
+write(B, $outFile);
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/clustering/expected.csv b/src/test/scripts/applications/entity_resolution/clustering/expected.csv
new file mode 100644
index 0000000..fe9af4f
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/expected.csv
@@ -0,0 +1,6 @@
+itemA,itemC,1.0
+itemA,itemD,1.0
+itemC,itemA,1.0
+itemC,itemD,1.0
+itemD,itemA,1.0
+itemD,itemC,1.0
diff --git a/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd b/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd
new file mode 100644
index 0000000..fc4fde8
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/expected.csv.mtd
@@ -0,0 +1,7 @@
+{
+ "data_type": "frame",
+ "schema": "STRING,STRING,FP64,",
+ "rows": 6,
+ "cols": 3,
+ "format": "csv"
+}
\ No newline at end of file
diff --git a/src/test/scripts/applications/entity_resolution/clustering/input.csv b/src/test/scripts/applications/entity_resolution/clustering/input.csv
new file mode 100644
index 0000000..51d4bc9
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/input.csv
@@ -0,0 +1,7 @@
+itemA,tokenA,1
+itemA,tokenC,1
+itemB,tokenB,1
+itemC,tokenC,1
+itemC,tokenD,2
+itemD,tokenA,1
+itemD,tokenD,2
diff --git a/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd b/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd
new file mode 100644
index 0000000..1a72ed3
--- /dev/null
+++ b/src/test/scripts/applications/entity_resolution/clustering/input.csv.mtd
@@ -0,0 +1,7 @@
+{
+ "data_type": "frame",
+ "rows": 7,
+ "cols": 3,
+ "format": "csv",
+ "header": false
+}
\ No newline at end of file
diff --git a/.github/workflows/applicationTests.yml b/src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
similarity index 60%
copy from .github/workflows/applicationTests.yml
copy to src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
index 652b31a..be51c02 100644
--- a/.github/workflows/applicationTests.yml
+++ b/src/test/scripts/applications/entity_resolution/connected_components/cluster_by_connected_components.dml
@@ -19,31 +19,8 @@
#
#-------------------------------------------------------------
-name: Application Test
+source("scripts/staging/entity-resolution/primitives/clustering.dml") as cluster;
-on:
- push:
- branches:
- - master
- pull_request:
- branches:
- - master
-
-jobs:
- applicationsTests:
- runs-on: ${{ matrix.os }}
- strategy:
- fail-fast: false
- matrix:
- tests: [A,B,C,G,H,I,L,M,N,O,P,S,U,W]
- os: [ubuntu-latest]
- name: Ap Test ${{ matrix.tests }}
- steps:
- - name: Checkout Repository
- uses: actions/checkout@v2
-
- - name: Run all tests starting with "${{ matrix.tests }}"
- uses: ./.github/action/
- id: test
- with:
- test-to-run: org.apache.sysds.test.applications.${{ matrix.tests }}**
+A = read($inFile);
+B = cluster::cluster_by_connected_components(A);
+write(B, $outFile);
\ No newline at end of file