You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2017/04/05 18:14:44 UTC

incubator-systemml git commit: [SYSTEMML-1462] Extract breast cancer preprocessing code into a Python package + script

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 23709ec60 -> e6555bbbc


[SYSTEMML-1462] Extract breast cancer preprocessing code into a Python package + script

This commit extracts the breast cancer project preprocessing functions
into a new `breastcancer` Python package, along with a new `preprocess.py`
script.  The notebook has all functions removed, leaving only the settings
and execution of the preprocessing using the new package, and the new script
is essentially a copy of that for use with `spark-submit`.

Closes #452.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e6555bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e6555bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e6555bbb

Branch: refs/heads/master
Commit: e6555bbbc8478b36781a1e37660f140219e2db73
Parents: 23709ec
Author: Mike Dusenberry <mw...@us.ibm.com>
Authored: Wed Apr 5 11:12:37 2017 -0700
Committer: Mike Dusenberry <mw...@us.ibm.com>
Committed: Wed Apr 5 11:12:37 2017 -0700

----------------------------------------------------------------------
 projects/breast_cancer/Preprocessing.ipynb      | 824 +------------------
 projects/breast_cancer/README.md                |  63 +-
 .../breast_cancer/breastcancer/preprocessing.py | 557 +++++++++++++
 .../breast_cancer/breastcancer/visualization.py |  69 ++
 projects/breast_cancer/preprocess.py            | 148 ++++
 5 files changed, 829 insertions(+), 832 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e6555bbb/projects/breast_cancer/Preprocessing.ipynb
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb
index 90d71b8..9c6850b 100644
--- a/projects/breast_cancer/Preprocessing.ipynb
+++ b/projects/breast_cancer/Preprocessing.ipynb
@@ -36,806 +36,24 @@
     "%autoreload 2\n",
     "%matplotlib inline\n",
     "\n",
-    "import math\n",
+    "import os\n",
+    "import shutil\n",
     "\n",
     "import matplotlib.pyplot as plt\n",
     "import numpy as np\n",
-    "import openslide\n",
-    "from openslide.deepzoom import DeepZoomGenerator\n",
-    "import pandas as pd\n",
-    "from pyspark.ml.linalg import Vectors\n",
-    "import pyspark.sql.functions as F\n",
-    "from scipy.ndimage.morphology import binary_fill_holes\n",
-    "from skimage.color import rgb2gray\n",
-    "from skimage.feature import canny\n",
-    "from skimage.morphology import binary_closing, binary_dilation, disk\n",
     "\n",
-    "plt.rcParams['figure.figsize'] = (10, 6)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Open Whole-Slide Image"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def open_slide(slide_num, folder, training):\n",
-    "  \"\"\"\n",
-    "  Open a whole-slide image, given an image number.\n",
-    "  \n",
-    "  Args:\n",
-    "    slide_num: Slide image number as an integer.\n",
-    "    folder: Directory in which the slides folder is stored, as a string.\n",
-    "      This should contain either a `training_image_data` folder with\n",
-    "      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
-    "      folder with images in the format `TUPAC-TE-###.svs`.\n",
-    "    training: Boolean for training or testing datasets.\n",
-    "  \n",
-    "  Returns:\n",
-    "    An OpenSlide object representing a whole-slide image.\n",
-    "  \"\"\"\n",
-    "  if training:\n",
-    "    filename = os.path.join(folder, \"training_image_data\",\n",
-    "                            \"TUPAC-TR-{}.svs\".format(str(slide_num).zfill(3)))\n",
-    "  else:\n",
-    "    # Testing images\n",
-    "    filename = os.path.join(folder, \"testing_image_data\",\n",
-    "                            \"TUPAC-TE-{}.svs\".format(str(slide_num).zfill(3)))\n",
-    "  slide = openslide.open_slide(filename)\n",
-    "  return slide"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Create Tile Generator"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def create_tile_generator(slide, tile_size, overlap):\n",
-    "  \"\"\"\n",
-    "  Create a tile generator for the given slide.\n",
-    "  \n",
-    "  This generator is able to extract tiles from the overall\n",
-    "  whole-slide image.\n",
-    "  \n",
-    "  Args:\n",
-    "    slide: An OpenSlide object representing a whole-slide image.\n",
-    "    tile_size: The width and height of a square tile to be generated.\n",
-    "    overlap: Number of pixels by which to overlap the tiles.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A DeepZoomGenerator object representing the tile generator. Each\n",
-    "    extracted tile is an Image with shape (tile_size, tile_size, channels).\n",
-    "    Note: This generator is not a true \"Python generator function\", but\n",
-    "    rather is an object that is capable of extracting individual tiles.\n",
-    "  \"\"\"\n",
-    "  generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)\n",
-    "  return generator"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Determine 20x Magnification Zoom Level"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def get_20x_zoom_level(slide, generator):\n",
-    "  \"\"\"\n",
-    "  Return the zoom level that corresponds to a 20x magnification.\n",
-    "  \n",
-    "  The generator can extract tiles from multiple zoom levels, downsampling\n",
-    "  by a factor of 2 per level from highest to lowest resolution.\n",
-    "  \n",
-    "  Args:\n",
-    "    slide: An OpenSlide object representing a whole-slide image.\n",
-    "    generator: A DeepZoomGenerator object representing a tile generator.\n",
-    "      Note: This generator is not a true \"Python generator function\", but\n",
-    "      rather is an object that is capable of extracting individual tiles.\n",
-    "  \n",
-    "  Returns:\n",
-    "    Zoom level corresponding to a 20x magnification, or as close as possible.\n",
-    "  \"\"\"\n",
-    "  highest_zoom_level = generator.level_count - 1  # 0-based indexing\n",
-    "  try:\n",
-    "    mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])\n",
-    "    # `mag / 20` gives the downsampling factor between the slide's\n",
-    "    # magnification and the desired 20x magnification.\n",
-    "    # `(mag / 20) / 2` gives the zoom level offset from the highest\n",
-    "    # resolution level, based on a 2x downsampling factor in the\n",
-    "    # generator.\n",
-    "    offset = math.floor((mag / 20) / 2)\n",
-    "    level = highest_zoom_level - offset\n",
-    "  except ValueError:\n",
-    "    # In case the slide magnification level is unknown, just\n",
-    "    # use the highest resolution.\n",
-    "    level = highest_zoom_level\n",
-    "  return level"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Generate Tile Indices For Whole-Slide Image."
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def process_slide(slide_num, folder, training, tile_size, overlap):\n",
-    "  \"\"\"\n",
-    "  Generate all possible tile indices for a whole-slide image.\n",
-    "  \n",
-    "  Given a slide number, tile size, and overlap, generate\n",
-    "  all possible (slide_num, tile_size, overlap, zoom_level, col, row)\n",
-    "  indices.\n",
-    "  \n",
-    "  Args:\n",
-    "    slide_num: Slide image number as an integer.\n",
-    "    folder: Directory in which the slides folder is stored, as a string.\n",
-    "      This should contain either a `training_image_data` folder with\n",
-    "      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
-    "      folder with images in the format `TUPAC-TE-###.svs`.\n",
-    "    training: Boolean for training or testing datasets.\n",
-    "    tile_size: The width and height of a square tile to be generated.\n",
-    "    overlap: Number of pixels by which to overlap the tiles.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A list of (slide_num, tile_size, overlap, zoom_level, col, row)\n",
-    "    integer index tuples representing possible tiles to extract.\n",
-    "  \"\"\"\n",
-    "  # Open slide.\n",
-    "  slide = open_slide(slide_num, folder, training)\n",
-    "  # Create tile generator.\n",
-    "  generator = create_tile_generator(slide, tile_size, overlap)\n",
-    "  # Get 20x zoom level.\n",
-    "  zoom_level = get_20x_zoom_level(slide, generator)\n",
-    "  # Generate all possible (zoom_level, col, row) tile index tuples.\n",
-    "  cols, rows = generator.level_tiles[zoom_level]\n",
-    "  tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)\n",
-    "                  for col in range(cols) for row in range(rows)]\n",
-    "  return tile_indices"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Generate Tile From Tile Index"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def process_tile_index(tile_index, folder, training):\n",
-    "  \"\"\"\n",
-    "  Generate a tile from a tile index.\n",
-    "  \n",
-    "  Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile\n",
-    "  index, generate a (slide_num, tile) tuple.\n",
-    "  \n",
-    "  Args:\n",
-    "    tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)\n",
-    "      integer index tuple representing a tile to extract.\n",
-    "    folder: Directory in which the slides folder is stored, as a string.\n",
-    "      This should contain either a `training_image_data` folder with\n",
-    "      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
-    "      folder with images in the format `TUPAC-TE-###.svs`.\n",
-    "    training: Boolean for training or testing datasets.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A (slide_num, tile) tuple, where slide_num is an integer, and tile\n",
-    "    is a 3D NumPy array of shape (tile_size, tile_size, channels) in\n",
-    "    RGB format.\n",
-    "  \"\"\"\n",
-    "  slide_num, tile_size, overlap, zoom_level, col, row = tile_index\n",
-    "  # Open slide.\n",
-    "  slide = open_slide(slide_num, folder, training)\n",
-    "  # Create tile generator.\n",
-    "  generator = create_tile_generator(slide, tile_size, overlap)\n",
-    "  # Generate tile.\n",
-    "  tile = np.array(generator.get_tile(zoom_level, (col, row)))\n",
-    "  return (slide_num, tile)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Filter Tile For Dimensions & Tissue Threshold"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def optical_density(tile):\n",
-    "  \"\"\"\n",
-    "  Convert a tile to optical density values.\n",
-    "  \n",
-    "  Args:\n",
-    "    tile: A 3D NumPy array of shape (tile_size, tile_size, channels).\n",
-    "  \n",
-    "  Returns:\n",
-    "    A 3D NumPy array of shape (tile_size, tile_size, channels) representing\n",
-    "    optical density values.\n",
-    "  \"\"\"\n",
-    "  tile = tile.astype(np.float64)\n",
-    "  #od = -np.log10(tile/255 + 1e-8)\n",
-    "  od = -np.log((tile+1)/240)\n",
-    "  return od\n",
+    "from breastcancer.preprocessing import preprocess, save, train_val_split\n",
     "\n",
-    "def keep_tile(tile_tuple, tile_size, tissue_threshold):\n",
-    "  \"\"\"\n",
-    "  Determine if a tile should be kept.\n",
-    "  \n",
-    "  This filters out tiles based on size and a tissue percentage\n",
-    "  threshold, using a custom algorithm. If a tile has height &\n",
-    "  width equal to (tile_size, tile_size), and contains greater\n",
-    "  than or equal to the given percentage, then it will be kept;\n",
-    "  otherwise it will be filtered out.\n",
-    "  \n",
-    "  Args:\n",
-    "    tile_tuple: A (slide_num, tile) tuple, where slide_num is an\n",
-    "      integer, and tile is a 3D NumPy array of shape \n",
-    "      (tile_size, tile_size, channels) in RGB format.\n",
-    "    tile_size: The width and height of a square tile to be generated.\n",
-    "    tissue_threshold: Tissue percentage threshold.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A Boolean indicating whether or not a tile should be kept\n",
-    "    for future usage.\n",
-    "  \"\"\"\n",
-    "  slide_num, tile = tile_tuple\n",
-    "  if tile.shape[0:2] == (tile_size, tile_size):\n",
-    "    tile_orig = tile\n",
-    "    \n",
-    "    # Check 1\n",
-    "    # Convert 3D RGB image to 2D grayscale image, from\n",
-    "    # 0 (dense tissue) to 1 (plain background).\n",
-    "    tile = rgb2gray(tile)\n",
-    "    # 8-bit depth complement, from 1 (dense tissue)\n",
-    "    # to 0 (plain background).\n",
-    "    tile = 1 - tile\n",
-    "    # Canny edge detection with hysteresis thresholding.\n",
-    "    # This returns a binary map of edges, with 1 equal to\n",
-    "    # an edge. The idea is that tissue would be full of\n",
-    "    # edges, while background would not.\n",
-    "    tile = canny(tile)\n",
-    "    # Binary closing, which is a dilation followed by\n",
-    "    # an erosion. This removes small dark spots, which\n",
-    "    # helps remove noise in the background.\n",
-    "    tile = binary_closing(tile, disk(10))\n",
-    "    # Binary dilation, which enlarges bright areas,\n",
-    "    # and shrinks dark areas. This helps fill in holes\n",
-    "    # within regions of tissue.\n",
-    "    tile = binary_dilation(tile, disk(10))\n",
-    "    # Fill remaining holes within regions of tissue.\n",
-    "    tile = binary_fill_holes(tile)\n",
-    "    # Calculate percentage of tissue coverage.\n",
-    "    percentage = tile.mean()\n",
-    "    check1 = percentage >= tissue_threshold\n",
-    "    \n",
-    "    # Check 2\n",
-    "    # Convert to optical density values\n",
-    "    tile = optical_density(tile_orig)\n",
-    "    # Threshold at beta\n",
-    "    beta = 0.15\n",
-    "    tile = np.min(tile, axis=2) >= beta\n",
-    "    # Apply morphology for same reasons as above.\n",
-    "    tile = binary_closing(tile, disk(2))\n",
-    "    tile = binary_dilation(tile, disk(2))\n",
-    "    tile = binary_fill_holes(tile)\n",
-    "    percentage = tile.mean()\n",
-    "    check2 = percentage >= tissue_threshold\n",
-    "    \n",
-    "    return check1 and check2\n",
-    "  else:\n",
-    "    return False"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Generate Flattened Samples From Tile"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def process_tile(tile_tuple, sample_size, grayscale):\n",
-    "  \"\"\"\n",
-    "  Process a tile into a group of smaller samples.\n",
-    "  \n",
-    "  Cut up a tile into smaller blocks of sample_size x sample_size pixels,\n",
-    "  change the shape of each sample from (H, W, channels) to \n",
-    "  (channels, H, W), then flatten each into a vector of length\n",
-    "  channels*H*W.\n",
-    "  \n",
-    "  Args:\n",
-    "    tile_tuple: A (slide_num, tile) tuple, where slide_num is an\n",
-    "    integer, and tile is a 3D NumPy array of shape \n",
-    "    (tile_size, tile_size, channels).\n",
-    "    sample_size: The new width and height of the square samples to be\n",
-    "      generated.\n",
-    "    grayscale: Whether or not to generate grayscale samples, rather\n",
-    "      than RGB.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A list of (slide_num, sample) tuples representing cut up tiles,\n",
-    "    where each sample has been transposed from\n",
-    "    (sample_size_x, sample_size_y, channels) to (channels, sample_size_x, sample_size_y),\n",
-    "    and flattened to a vector of length (channels*sample_size_x*sample_size_y).\n",
-    "  \"\"\"\n",
-    "  slide_num, tile = tile_tuple\n",
-    "  if grayscale:\n",
-    "    tile = rgb2gray(tile)[:, :, np.newaxis]  # Grayscale\n",
-    "    # Save disk space and future IO time by converting from [0,1] to [0,255],\n",
-    "    # at the expense of some minor loss of information.\n",
-    "    tile = np.round(tile * 255).astype(\"uint8\")\n",
-    "  x, y, ch = tile.shape\n",
-    "  # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where\n",
-    "  # num_x and num_y are the number of chopped tiles on the x and y axes, respectively.\n",
-    "  # 2. Swap sample_size_x and num_y axes to create\n",
-    "  # (num_x, num_y, sample_size_x, sample_size_y, ch).\n",
-    "  # 3. Combine num_x and num_y into single axis, returning\n",
-    "  # (num_samples, sample_size_x, sample_size_y, ch).\n",
-    "  # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to\n",
-    "  # (num_samples, ch, sample_size_x, sample_size_y).\n",
-    "  # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).\n",
-    "  samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))\n",
-    "                 .swapaxes(1,2)\n",
-    "                 .reshape((-1, sample_size, sample_size, ch))\n",
-    "                 .transpose(0,3,1,2))\n",
-    "  samples = samples.reshape(samples.shape[0], -1)\n",
-    "  samples = [(slide_num, sample) for sample in list(samples)]\n",
-    "  return samples"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Get Ground Truth Labels"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def get_labels_df(folder):\n",
-    "  \"\"\"\n",
-    "  Create a DataFrame with the ground truth labels for each slide.\n",
-    "  \n",
-    "  Args:\n",
-    "    folder: Directory containing a `training_ground_truth.csv` file\n",
-    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
-    "      labels for each slide.\n",
+    "# Ship a fresh copy of the `breastcancer` package to the Spark workers.\n",
+    "# Note: The zip must include the `breastcancer` directory itself,\n",
+    "# as well as all files within it for `addPyFile` to work correctly.\n",
+    "# This is equivalent to `zip -r breastcancer.zip breastcancer`.\n",
+    "dirname = \"breastcancer\"\n",
+    "zipname = dirname + \".zip\"\n",
+    "shutil.make_archive(dirname, 'zip', dirname + \"/..\", dirname)\n",
+    "spark.sparkContext.addPyFile(zipname)\n",
     "\n",
-    "  Returns:\n",
-    "    A Pandas DataFrame containing the ground truth labels for each slide.\n",
-    "  \"\"\"\n",
-    "  filepath = os.path.join(folder, \"training_ground_truth.csv\")\n",
-    "  labels_df = pd.read_csv(filepath, names=[\"tumor_score\", \"molecular_score\"], header=None)\n",
-    "  labels_df[\"slide_num\"] = labels_df.index + 1  # slide numbering starts at 1\n",
-    "  labels_df.set_index(\"slide_num\", drop=False, inplace=True)  # use the slide num as index\n",
-    "  return labels_df"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Process All Slides Into A Spark DataFrame"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def preprocess(slide_nums, folder=\"data\", training=True, tile_size=1024, overlap=0,\n",
-    "               tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):\n",
-    "  \"\"\"\n",
-    "  Preprocess a set of whole-slide images.\n",
-    "  \n",
-    "  Preprocess a set of whole-slide images as follows:\n",
-    "    1. Tile the slides into tiles of size (tile_size, tile_size, 3).\n",
-    "    2. Filter the tiles to remove unnecessary tissue.\n",
-    "    3. Cut the remaining tiles into samples of size (sample_size, sample_size, ch),\n",
-    "       where `ch` is 1 if `grayscale` is true, or 3 otherwise.\n",
-    "  \n",
-    "  Args:\n",
-    "    slide_nums: List of whole-slide numbers to process.\n",
-    "    folder: Local directory in which the slides folder and ground truth\n",
-    "      file is stored, as a string. This should contain a\n",
-    "      `training_image_data` folder with images in the format\n",
-    "      `TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file\n",
-    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
-    "      labels for each slide.  Alternatively, the folder should contain a\n",
-    "      `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`.\n",
-    "    training: Boolean for training or testing datasets.\n",
-    "    tile_size: The width and height of a square tile to be generated.\n",
-    "    overlap: Number of pixels by which to overlap the tiles.\n",
-    "    tissue_threshold: Tissue percentage threshold for filtering.\n",
-    "    sample_size: The new width and height of the square samples to be\n",
-    "      generated.\n",
-    "    grayscale: Whether or not to generate grayscale samples, rather\n",
-    "      than RGB.\n",
-    "    num_partitions: Number of partitions to use during processing.\n",
-    "  \n",
-    "  Returns:\n",
-    "    A DataFrame in which each row contains the slide number, tumor score,\n",
-    "    molecular score, and the sample stretched out into a Vector.\n",
-    "  \"\"\"\n",
-    "  slides = sc.parallelize(slide_nums)\n",
-    "  # Create DataFrame of all tile locations and increase number of partitions\n",
-    "  # to avoid OOM during subsequent processing.\n",
-    "  tile_indices = (slides.flatMap(\n",
-    "      lambda slide: process_slide(slide, folder, training, tile_size, overlap)))\n",
-    "  # TODO: Explore computing the ideal paritition sizes based on projected number\n",
-    "  #   of tiles after filtering.  I.e. something like the following:\n",
-    "  #rows = tile_indices.count()\n",
-    "  #part_size = 128\n",
-    "  #channels = 1 if grayscale else 3\n",
-    "  #row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024  # size of one row in MB\n",
-    "  #rows_per_part = round(part_size / row_mb)\n",
-    "  #num_parts = rows / rows_per_part\n",
-    "  ## HACK: Force even partitioning by collecting and parallelizing -- for memory issues.\n",
-    "  ## Note: This was a PySpark bug with a fix in the master branch now.\n",
-    "  #tile_indices = tile_indices.collect()\n",
-    "  #tile_indices = sc.parallelize(tile_indices, num_partitions)\n",
-    "  ## END HACK\n",
-    "  tile_indices = tile_indices.repartition(num_partitions)\n",
-    "  tile_indices.cache()\n",
-    "  # Extract all tiles into a DataFrame, filter, and cut into smaller samples.\n",
-    "  tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))\n",
-    "  filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n",
-    "  samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n",
-    "  if training:\n",
-    "    # Append labels\n",
-    "    labels_df = get_labels_df(folder)\n",
-    "    samples_with_labels = (samples.map(\n",
-    "        lambda tup: (tup[0], int(labels_df.at[tup[0],\"tumor_score\"]),\n",
-    "                     float(labels_df.at[tup[0],\"molecular_score\"]), Vectors.dense(tup[1]))))\n",
-    "    df = samples_with_labels.toDF([\"slide_num\", \"tumor_score\", \"molecular_score\", \"sample\"])\n",
-    "    df = df.select(df.slide_num.astype(\"int\"), df.tumor_score.astype(\"int\"),\n",
-    "                   df.molecular_score, df[\"sample\"])\n",
-    "  else:  # testing data -- no labels\n",
-    "    df = samples.toDF([\"slide_num\", \"sample\"])\n",
-    "    df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n",
-    "  #df = df.repartition(num_partitions)  # HACK: Even out the partitions to avoid issues during saving\n",
-    "  return df"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def train_val_split(df, slide_nums, folder, train_frac=0.8, add_row_indices=True, seed=None, debug=False):\n",
-    "  \"\"\"\n",
-    "  Split a DataFrame of slide samples into training and validation sets.\n",
-    "  \n",
-    "  Args:\n",
-    "    df: A DataFrame in which each row contains the slide number,\n",
-    "    tumor score, molecular score, and the sample stretched out into\n",
-    "    a Vector.\n",
-    "    slide_nums: A list of slide numbers to sample from.\n",
-    "    folder: Directory containing a `training_ground_truth.csv` file\n",
-    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
-    "      labels for each slide.\n",
-    "    train_frac: Fraction of the data to assign to the training set, with\n",
-    "      `1-frac` assigned to the valiation set.\n",
-    "    add_row_indices: Boolean for whether or not to prepend an index\n",
-    "      column contain the row index for use downstream by SystemML.\n",
-    "      The column name will be \"__INDEX\".\n",
-    "    \n",
-    "    Returns:\n",
-    "      A DataFrame in which each row contains the slide number, tumor score,\n",
-    "      molecular score, and the sample stretched out into a Vector.\n",
-    "  \"\"\"\n",
-    "  # Create DataFrame of labels for the given slide numbers.\n",
-    "  labels_df = get_labels_df(folder)\n",
-    "  labels_df = labels_df.loc[slide_nums]\n",
-    "    \n",
-    "  # Randomly split slides 80%/20% into train and validation sets.\n",
-    "  train_nums_df = labels_df.sample(frac=train_frac, random_state=seed)\n",
-    "  val_nums_df = labels_df.drop(train_nums_df.index)\n",
-    "\n",
-    "  train_nums = (spark.createDataFrame(train_nums_df)\n",
-    "                     .selectExpr(\"cast(slide_num as int)\")\n",
-    "                     .coalesce(1))\n",
-    "  val_nums = (spark.createDataFrame(val_nums_df)\n",
-    "                   .selectExpr(\"cast(slide_num as int)\")\n",
-    "                   .coalesce(1))\n",
-    "\n",
-    "  # Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n",
-    "  # in order to have Catalyst choose the more efficient BroadcastHashJoin, \n",
-    "  # rather than the costly SortMergeJoin.\n",
-    "  train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
-    "  val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
-    "  \n",
-    "  if debug:\n",
-    "    # DEBUG: Sanity checks.\n",
-    "    assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n",
-    "    assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
-    "    assert train.join(val, on=\"slide_num\").count() == 0\n",
-    "    #  - Check distributions.\n",
-    "    for pdf in train_nums_df, val_nums_df:\n",
-    "      print(pdf.count())\n",
-    "      print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
-    "      print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n",
-    "    #  - Check total number of examples in each.\n",
-    "    print(train.count(), val.count())\n",
-    "    #  - Check physical plans for broadcast join.\n",
-    "    print(train.explain(), val.explain())\n",
-    "  \n",
-    "  # Add row indices for use with SystemML.\n",
-    "  if add_row_indices:\n",
-    "    train = (train.rdd\n",
-    "                  .zipWithIndex()\n",
-    "                  .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
-    "                  .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
-    "    train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n",
-    "                         train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n",
-    "\n",
-    "    val = (val.rdd\n",
-    "              .zipWithIndex()\n",
-    "              .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
-    "              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
-    "    val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n",
-    "                     val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n",
-    "\n",
-    "  return train, val"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Visualize Tile"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def visualize_tile(tile):\n",
-    "  \"\"\"\n",
-    "  Plot a tissue tile.\n",
-    "  \n",
-    "  Args:\n",
-    "    tile: A 3D NumPy array of shape (tile_size, tile_size, channels).\n",
-    "  \n",
-    "  Returns:\n",
-    "    None\n",
-    "  \"\"\"\n",
-    "  plt.imshow(tile)\n",
-    "  plt.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Visualize Sample"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": true,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def visualize_sample(sample, size=256):\n",
-    "  \"\"\"\n",
-    "  Plot a tissue sample.\n",
-    "  \n",
-    "  Args:\n",
-    "    sample: A square sample flattened to a vector of size\n",
-    "      (channels*size_x*size_y).\n",
-    "    size: The width and height of the square samples.\n",
-    "  \n",
-    "  Returns:\n",
-    "    None\n",
-    "  \"\"\"\n",
-    "  # Change type, reshape, transpose to (size_x, size_y, channels).\n",
-    "  length = sample.shape[0]\n",
-    "  channels = int(length / (size * size))\n",
-    "  if channels > 1:\n",
-    "    sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)\n",
-    "    plt.imshow(sample)\n",
-    "  else:\n",
-    "    vmax = 255 if sample.max() > 1 else 1\n",
-    "    sample = sample.reshape((size, size))\n",
-    "    plt.imshow(sample, cmap=\"gray\", vmin=0, vmax=vmax)\n",
-    "  plt.show()"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Save DataFrame"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "def save(df, filepath, sample_size, grayscale, mode=\"error\", format=\"parquet\", file_size=128):\n",
-    "  \"\"\"\n",
-    "  Save a preprocessed DataFrame with a constraint on the file sizes.\n",
-    "  \n",
-    "  Args:\n",
-    "    df: A DataFrame.\n",
-    "    filepath: Hadoop-supported path at which to save the DataFrame.\n",
-    "    sample_size: The width and height of the square samples.\n",
-    "    grayscale: Whether or not to the samples are in grayscale format, rather\n",
-    "      than RGB.\n",
-    "    mode: Specifies the behavior of `df.write.mode` when the data already exists.\n",
-    "      Options include:\n",
-    "        * `append`: Append contents of this :class:`DataFrame` to existing data.\n",
-    "        * `overwrite`: Overwrite existing data.\n",
-    "        * `error`: Throw an exception if data already exists.\n",
-    "        * `ignore`: Silently ignore this operation if data already exists.\n",
-    "    format: The format in which to save the DataFrame.\n",
-    "    file_size: Size in MB of each saved file.  128 MB is an empirically ideal size.\n",
-    "  \"\"\"\n",
-    "  channels = 1 if grayscale else 3\n",
-    "  row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024  # size of one row in MB\n",
-    "  rows_per_file = round(file_size / row_mb)\n",
-    "  df.write.option(\"maxRecordsPerFile\", rows_per_file).mode(mode).save(filepath, format=format)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "---"
+    "plt.rcParams['figure.figsize'] = (10, 6)"
    ]
   },
   {
@@ -884,15 +102,17 @@
     "sample_size = 256\n",
     "grayscale = False\n",
     "num_partitions = 20000\n",
-    "add_row_indices = False #True\n",
+    "add_row_indices = True\n",
     "train_frac = 0.8\n",
+    "split_seed = 24\n",
     "folder = \"/home/MDM/breast_cancer/data\"\n",
-    "# labels_df_path = os.path.join(folder, \"training_ground_truth.csv\")\n",
     "save_folder = \"data\"  # Hadoop-supported directory in which to save DataFrames\n",
     "df_path = os.path.join(save_folder, \"samples_{}_{}{}.parquet\".format(\n",
     "    \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\"))\n",
-    "train_df_path = os.path.join(save_folder, \"train_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))\n",
-    "val_df_path = os.path.join(save_folder, \"val_{}{}.parquet\".format(sample_size, \"_grayscale\" if grayscale else \"\"))"
+    "train_df_path = os.path.join(save_folder, \"train_{}{}.parquet\".format(sample_size,\n",
+    "    \"_grayscale\" if grayscale else \"\"))\n",
+    "val_df_path = os.path.join(save_folder, \"val_{}{}.parquet\".format(sample_size,\n",
+    "    \"_grayscale\" if grayscale else \"\"))"
    ]
   },
   {
@@ -906,8 +126,9 @@
    "outputs": [],
    "source": [
     "# Process all slides.\n",
-    "df = preprocess(slide_nums, tile_size=tile_size, sample_size=sample_size, grayscale=grayscale,\n",
-    "                training=training, num_partitions=num_partitions, folder=folder)"
+    "df = preprocess(spark, slide_nums, tile_size=tile_size, sample_size=sample_size,\n",
+    "                grayscale=grayscale, training=training, num_partitions=num_partitions,\n",
+    "                folder=folder)"
    ]
   },
   {
@@ -949,7 +170,8 @@
    "outputs": [],
    "source": [
     "# Split into train and validation DataFrames based On slide number\n",
-    "train, val = train_val_split(df, slide_nums, folder, train_frac, add_row_indices)"
+    "train, val = train_val_split(spark, df, slide_nums, folder, train_frac, add_row_indices,\n",
+    "                             seed=split_seed)"
    ]
   },
   {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e6555bbb/projects/breast_cancer/README.md
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/README.md b/projects/breast_cancer/README.md
index dbb626f..d1c3a2c 100644
--- a/projects/breast_cancer/README.md
+++ b/projects/breast_cancer/README.md
@@ -27,19 +27,19 @@ The [Tumor Proliferation Assessment Challenge 2016 (TUPAC16)](http://tupac.tue-i
 ## Background
 Breast cancer is the leading cause of cancerous death in women in less-developed countries, and is the second leading cause of cancerous deaths in developed countries, accounting for 29% of all cancers in women within the U.S. [1]. Survival rates increase as early detection increases, giving incentive for pathologists and the medical world at large to develop improved methods for even earlier detection [2].  There are many forms of breast cancer including Ductal Carcinoma in Situ (DCIS), Invasive Ductal Carcinoma (IDC), Tubular Carcinoma of the Breast, Medullary Carcinoma of the Breast, Invasive Lobular Carcinoma, Inflammatory Breast Cancer and several others [3]. Within all of these forms of breast cancer, the rate in which breast cancer cells grow (proliferation), is a strong indicator of a patient\u2019s prognosis. Although there are many means of determining the presence of breast cancer, tumor proliferation speed has been proven to help pathologists determine the treatment for the
  patient. The most common technique for determining the proliferation speed is through mitotic count (mitotic index) estimates, in which a pathologist counts the dividing cell nuclei in hematoxylin and eosin (H&E) stained slide preparations to determine the number of mitotic bodies.  Given this, the pathologist produces a proliferation score of either 1, 2, or 3, ranging from better to worse prognosis [4]. Unfortunately, this approach is known to have reproducibility problems due to the variability in counting, as well as the difficulty in distinguishing between different grades.
 
-References:  
-[1] http://emedicine.medscape.com/article/1947145-overview#a3  
-[2] http://emedicine.medscape.com/article/1947145-overview#a7  
-[3] http://emedicine.medscape.com/article/1954658-overview  
-[4] http://emedicine.medscape.com/article/1947145-workup#c12  
+References:
+[1] http://emedicine.medscape.com/article/1947145-overview#a3
+[2] http://emedicine.medscape.com/article/1947145-overview#a7
+[3] http://emedicine.medscape.com/article/1954658-overview
+[4] http://emedicine.medscape.com/article/1947145-workup#c12
 
 ## Goal & Approach
 In an effort to automate the process of classification, this project aims to develop a large-scale deep learning approach for predicting tumor scores directly from the pixels of whole-slide histopathology images.  Our proposed approach is based on a recent research paper from Stanford [1].  Starting with 500 extremely high-resolution tumor slide images with accompanying score labels, we aim to make use of Apache Spark in a preprocessing step to cut and filter the images into smaller square samples, generating 4.7 million samples for a total of ~7TB of data [2].  We then utilize Apache SystemML on top of Spark to develop and train a custom, large-scale, deep convolutional neural network on these samples, making use of the familiar linear algebra syntax and automatically-distributed execution of SystemML [3].  Our model takes as input the pixel values of the individual samples, and is trained to predict the correct tumor score classification for each one.  In addition to distributed l
 inear algebra, we aim to exploit task-parallelism via parallel for-loops for hyperparameter optimization, as well as hardware acceleration for faster training via a GPU-backed runtime.  Ultimately, we aim to develop a model that is sufficiently stronger than existing approaches for the task of breast cancer tumor proliferation score classification.
 
-References:  
-[1] https://web.stanford.edu/group/rubinlab/pubs/2243353.pdf  
-[2] See [`Preprocessing.ipynb`](Preprocessing.ipynb).  
-[3] See [`MachineLearning.ipynb`](MachineLearning.ipynb), [`softmax_clf.dml`](softmax_clf.dml), and [`convnet.dml`](convnet.dml).  
+References:
+[1] https://web.stanford.edu/group/rubinlab/pubs/2243353.pdf
+[2] See [`Preprocessing.ipynb`](Preprocessing.ipynb), and [`breastcancer/preprocessing.py`](breastcancer/preprocessing.py).
+[3] See [`MachineLearning.ipynb`](MachineLearning.ipynb), [`softmax_clf.dml`](softmax_clf.dml), and [`convnet.dml`](convnet.dml).
 
 ![Approach](https://apache.github.io/incubator-systemml/img/projects/breast_cancer/approach.svg)
 
@@ -65,32 +65,32 @@ References:
   * `cd incubator-systemml`
   * `mvn clean package`
   * `pip3 install -e src/main/python`
-* Create a `data` folder with the following contents (same location on *all* nodes):
+* Add the following to the `data` folder (same location on *all* nodes):
   * `training_image_data` folder with the training slides.
   * `testing_image_data` folder with the testing slides.
   * `training_ground_truth.csv` file containing the tumor & molecular scores for each slide.
-* Create a project folder (i.e. `breast_cancer`) with the following contents (only driver):
-  * All notebooks (`*.ipynb`).
-  * All DML scripts (`*.dml`).
-  * SystemML-NN installed as an `nn` folder containing the contents of `$SYSTEMML_HOME/scripts/staging/SystemML-NN/nn` (either copy & paste, or use a softlink).
-  * The `data` folder (or a softlink pointing to it).
-  * Layout:
+* Layout:
+  ```
+  - MachineLearning.ipynb
+  - Preprocessing.ipynb
+  - breastcancer/
+    - preprocessing.py
+    - visualization.py
+  - convnet.dml
+  - nn/
+  - ...
+  - data/
+    - training_ground_truth.csv
+    - training_image_data
+        - TUPAC-TR-001.svs
+        - TUPAC-TR-002.svs
+        - ...
+    - testing_image_data
+        - TUPAC-TE-001.svs
+        - TUPAC-TE-002.svs
+        - ...
+  ```
 
-    ```
-    - MachineLearning.ipynb
-    - Preprocessing.ipynb
-    - ...
-    - data/
-      - training_ground_truth.csv
-      - training_image_data
-          - TUPAC-TR-001.svs
-          - TUPAC-TR-002.svs
-          - ...
-      - testing_image_data
-          - TUPAC-TE-001.svs
-          - TUPAC-TE-002.svs
-          - ...
-    ```
 
 * Adjust the Spark settings in `$SPARK_HOME/conf/spark-defaults.conf` using the following examples, depending on the job being executed:
   * All jobs:
@@ -123,6 +123,7 @@ References:
     spark.executor.memory 100g
     ```
 
+* `cd` to this `breast_cancer` folder.
 * Start Jupyter + PySpark with the following command (could also use Yarn in client mode with `--master yarn --deploy-mode`):
   ```
   PYSPARK_PYTHON=python3 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark --master spark://MASTER_URL:7077 --driver-class-path $SYSTEMML_HOME/target/SystemML.jar --jars $SYSTEMML_HOME/target/SystemML.jar

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e6555bbb/projects/breast_cancer/breastcancer/preprocessing.py
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/breastcancer/preprocessing.py b/projects/breast_cancer/breastcancer/preprocessing.py
new file mode 100644
index 0000000..424d699
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/preprocessing.py
@@ -0,0 +1,557 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+"""
+Preprocessing -- Predicting Breast Cancer Proliferation Scores with
+Apache SystemML
+
+This module contains functions for the preprocessing phase of the
+breast cancer project.
+"""
+
+import math
+import os
+
+import numpy as np
+import openslide
+from openslide.deepzoom import DeepZoomGenerator
+import pandas as pd
+from pyspark.ml.linalg import Vectors
+import pyspark.sql.functions as F
+from scipy.ndimage.morphology import binary_fill_holes
+from skimage.color import rgb2gray
+from skimage.feature import canny
+from skimage.morphology import binary_closing, binary_dilation, disk
+
+
+# Open Whole-Slide Image
+
+def open_slide(slide_num, folder, training):
+  """
+  Open a whole-slide image, given an image number.
+  
+  Args:
+    slide_num: Slide image number as an integer.
+    folder: Directory in which the slides folder is stored, as a string.
+      This should contain either a `training_image_data` folder with
+      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
+      folder with images in the format `TUPAC-TE-###.svs`.
+    training: Boolean for training or testing datasets.
+  
+  Returns:
+    An OpenSlide object representing a whole-slide image.
+  """
+  if training:
+    filename = os.path.join(folder, "training_image_data",
+                            "TUPAC-TR-{}.svs".format(str(slide_num).zfill(3)))
+  else:
+    # Testing images
+    filename = os.path.join(folder, "testing_image_data",
+                            "TUPAC-TE-{}.svs".format(str(slide_num).zfill(3)))
+  slide = openslide.open_slide(filename)
+  return slide
+
+
+# Create Tile Generator
+
+def create_tile_generator(slide, tile_size, overlap):
+  """
+  Create a tile generator for the given slide.
+  
+  This generator is able to extract tiles from the overall
+  whole-slide image.
+  
+  Args:
+    slide: An OpenSlide object representing a whole-slide image.
+    tile_size: The width and height of a square tile to be generated.
+    overlap: Number of pixels by which to overlap the tiles.
+  
+  Returns:
+    A DeepZoomGenerator object representing the tile generator. Each
+    extracted tile is a PIL Image with shape
+    (tile_size, tile_size, channels).
+    Note: This generator is not a true "Python generator function", but
+    rather is an object that is capable of extracting individual tiles.
+  """
+  generator = DeepZoomGenerator(slide, tile_size=tile_size, overlap=overlap, limit_bounds=True)
+  return generator
+
+
+# Determine 20x Magnification Zoom Level
+
+def get_20x_zoom_level(slide, generator):
+  """
+  Return the zoom level that corresponds to a 20x magnification.
+  
+  The generator can extract tiles from multiple zoom levels,
+  downsampling by a factor of 2 per level from highest to lowest
+  resolution.
+  
+  Args:
+    slide: An OpenSlide object representing a whole-slide image.
+    generator: A DeepZoomGenerator object representing a tile generator.
+      Note: This generator is not a true "Python generator function",
+      but rather is an object that is capable of extracting individual
+      tiles.
+  
+  Returns:
+    Zoom level corresponding to a 20x magnification, or as close as
+    possible.
+  """
+  highest_zoom_level = generator.level_count - 1  # 0-based indexing
+  try:
+    mag = int(slide.properties[openslide.PROPERTY_NAME_OBJECTIVE_POWER])
+    # `mag / 20` gives the downsampling factor between the slide's
+    # magnification and the desired 20x magnification.
+    # `(mag / 20) / 2` gives the zoom level offset from the highest
+    # resolution level, based on a 2x downsampling factor in the
+    # generator.
+    offset = math.floor((mag / 20) / 2)
+    level = highest_zoom_level - offset
+  except ValueError:
+    # In case the slide magnification level is unknown, just
+    # use the highest resolution.
+    level = highest_zoom_level
+  return level
+
+
+# Generate Tile Indices For Whole-Slide Image.
+
+def process_slide(slide_num, folder, training, tile_size, overlap):
+  """
+  Generate all possible tile indices for a whole-slide image.
+  
+  Given a slide number, tile size, and overlap, generate
+  all possible (slide_num, tile_size, overlap, zoom_level, col, row)
+  indices.
+  
+  Args:
+    slide_num: Slide image number as an integer.
+    folder: Directory in which the slides folder is stored, as a string.
+      This should contain either a `training_image_data` folder with
+      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
+      folder with images in the format `TUPAC-TE-###.svs`.
+    training: Boolean for training or testing datasets.
+    tile_size: The width and height of a square tile to be generated.
+    overlap: Number of pixels by which to overlap the tiles.
+  
+  Returns:
+    A list of (slide_num, tile_size, overlap, zoom_level, col, row)
+    integer index tuples representing possible tiles to extract.
+  """
+  # Open slide.
+  slide = open_slide(slide_num, folder, training)
+  # Create tile generator.
+  generator = create_tile_generator(slide, tile_size, overlap)
+  # Get 20x zoom level.
+  zoom_level = get_20x_zoom_level(slide, generator)
+  # Generate all possible (zoom_level, col, row) tile index tuples.
+  cols, rows = generator.level_tiles[zoom_level]
+  tile_indices = [(slide_num, tile_size, overlap, zoom_level, col, row)
+                  for col in range(cols) for row in range(rows)]
+  return tile_indices
+
+
+# Generate Tile From Tile Index
+
+def process_tile_index(tile_index, folder, training):
+  """
+  Generate a tile from a tile index.
+  
+  Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile
+  index, generate a (slide_num, tile) tuple.
+  
+  Args:
+    tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)
+      integer index tuple representing a tile to extract.
+    folder: Directory in which the slides folder is stored, as a string.
+      This should contain either a `training_image_data` folder with
+      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
+      folder with images in the format `TUPAC-TE-###.svs`.
+    training: Boolean for training or testing datasets.
+  
+  Returns:
+    A (slide_num, tile) tuple, where slide_num is an integer, and tile
+    is a 3D NumPy array of shape (tile_size, tile_size, channels) in
+    RGB format.
+  """
+  slide_num, tile_size, overlap, zoom_level, col, row = tile_index
+  # Open slide.
+  slide = open_slide(slide_num, folder, training)
+  # Create tile generator.
+  generator = create_tile_generator(slide, tile_size, overlap)
+  # Generate tile.
+  tile = np.array(generator.get_tile(zoom_level, (col, row)))
+  return (slide_num, tile)
+
+
+# Filter Tile For Dimensions & Tissue Threshold
+
+def optical_density(tile):
+  """
+  Convert a tile to optical density values.
+  
+  Args:
+    tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
+  
+  Returns:
+    A 3D NumPy array of shape (tile_size, tile_size, channels)
+    representing optical density values.
+  """
+  tile = tile.astype(np.float64)
+  #od = -np.log10(tile/255 + 1e-8)
+  od = -np.log((tile+1)/240)
+  return od
+
+
+def keep_tile(tile_tuple, tile_size, tissue_threshold):
+  """
+  Determine if a tile should be kept.
+  
+  This filters out tiles based on size and a tissue percentage
+  threshold, using a custom algorithm. If a tile has height &
+  width equal to (tile_size, tile_size), and contains greater
+  than or equal to the given percentage, then it will be kept;
+  otherwise it will be filtered out.
+  
+  Args:
+    tile_tuple: A (slide_num, tile) tuple, where slide_num is an
+      integer, and tile is a 3D NumPy array of shape 
+      (tile_size, tile_size, channels) in RGB format.
+    tile_size: The width and height of a square tile to be generated.
+    tissue_threshold: Tissue percentage threshold.
+  
+  Returns:
+    A Boolean indicating whether or not a tile should be kept for
+    future usage.
+  """
+  slide_num, tile = tile_tuple
+  if tile.shape[0:2] == (tile_size, tile_size):
+    tile_orig = tile
+    
+    # Check 1
+    # Convert 3D RGB image to 2D grayscale image, from
+    # 0 (dense tissue) to 1 (plain background).
+    tile = rgb2gray(tile)
+    # 8-bit depth complement, from 1 (dense tissue)
+    # to 0 (plain background).
+    tile = 1 - tile
+    # Canny edge detection with hysteresis thresholding.
+    # This returns a binary map of edges, with 1 equal to
+    # an edge. The idea is that tissue would be full of
+    # edges, while background would not.
+    tile = canny(tile)
+    # Binary closing, which is a dilation followed by
+    # an erosion. This removes small dark spots, which
+    # helps remove noise in the background.
+    tile = binary_closing(tile, disk(10))
+    # Binary dilation, which enlarges bright areas,
+    # and shrinks dark areas. This helps fill in holes
+    # within regions of tissue.
+    tile = binary_dilation(tile, disk(10))
+    # Fill remaining holes within regions of tissue.
+    tile = binary_fill_holes(tile)
+    # Calculate percentage of tissue coverage.
+    percentage = tile.mean()
+    check1 = percentage >= tissue_threshold
+    
+    # Check 2
+    # Convert to optical density values
+    tile = optical_density(tile_orig)
+    # Threshold at beta
+    beta = 0.15
+    tile = np.min(tile, axis=2) >= beta
+    # Apply morphology for same reasons as above.
+    tile = binary_closing(tile, disk(2))
+    tile = binary_dilation(tile, disk(2))
+    tile = binary_fill_holes(tile)
+    percentage = tile.mean()
+    check2 = percentage >= tissue_threshold
+    
+    return check1 and check2
+  else:
+    return False
+
+
+# Generate Flattened Samples From Tile
+
+def process_tile(tile_tuple, sample_size, grayscale):
+  """
+  Process a tile into a group of smaller samples.
+  
+  Cut up a tile into smaller blocks of sample_size x sample_size pixels,
+  change the shape of each sample from (H, W, channels) to 
+  (channels, H, W), then flatten each into a vector of length
+  channels*H*W.
+  
+  Args:
+    tile_tuple: A (slide_num, tile) tuple, where slide_num is an
+      integer, and tile is a 3D NumPy array of shape 
+      (tile_size, tile_size, channels).
+    sample_size: The new width and height of the square samples to be
+      generated.
+    grayscale: Whether or not to generate grayscale samples, rather
+      than RGB.
+  
+  Returns:
+    A list of (slide_num, sample) tuples representing cut up tiles,
+    where each sample has been transposed from
+    (sample_size_x, sample_size_y, channels) to
+    (channels, sample_size_x, sample_size_y),
+    and flattened to a vector of length
+    (channels*sample_size_x*sample_size_y).
+  """
+  slide_num, tile = tile_tuple
+  if grayscale:
+    tile = rgb2gray(tile)[:, :, np.newaxis]  # Grayscale
+    # Save disk space and future IO time by converting from [0,1] to [0,255],
+    # at the expense of some minor loss of information.
+    tile = np.round(tile * 255).astype("uint8")
+  x, y, ch = tile.shape
+  # 1. Reshape into a 5D array of (num_x, sample_size_x, num_y, sample_size_y, ch), where
+  # num_x and num_y are the number of chopped tiles on the x and y axes, respectively.
+  # 2. Swap sample_size_x and num_y axes to create
+  # (num_x, num_y, sample_size_x, sample_size_y, ch).
+  # 3. Combine num_x and num_y into single axis, returning
+  # (num_samples, sample_size_x, sample_size_y, ch).
+  # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to
+  # (num_samples, ch, sample_size_x, sample_size_y).
+  # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).
+  samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))
+                 .swapaxes(1,2)
+                 .reshape((-1, sample_size, sample_size, ch))
+                 .transpose(0,3,1,2))
+  samples = samples.reshape(samples.shape[0], -1)
+  samples = [(slide_num, sample) for sample in list(samples)]
+  return samples
+
+
+# Get Ground Truth Labels
+
+def get_labels_df(folder):
+  """
+  Create a DataFrame with the ground truth labels for each slide.
+  
+  Args:
+    folder: Directory containing a `training_ground_truth.csv` file
+      containing the ground truth "tumor_score" and "molecular_score"
+      labels for each slide.
+
+  Returns:
+    A Pandas DataFrame containing the ground truth labels for each
+    slide.
+  """
+  filepath = os.path.join(folder, "training_ground_truth.csv")
+  labels_df = pd.read_csv(filepath, names=["tumor_score", "molecular_score"], header=None)
+  labels_df["slide_num"] = labels_df.index + 1  # slide numbering starts at 1
+  labels_df.set_index("slide_num", drop=False, inplace=True)  # use the slide num as index
+  return labels_df
+
+
+# Process All Slides Into A Spark DataFrame
+
+def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, overlap=0,
+               tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):
+  """
+  Preprocess a set of whole-slide images.
+  
+  Preprocess a set of whole-slide images as follows:
+    1. Tile the slides into tiles of size (tile_size, tile_size, 3).
+    2. Filter the tiles to remove unnecessary tissue.
+    3. Cut the remaining tiles into samples of size
+       (sample_size, sample_size, ch), where `ch` is 1 if `grayscale`
+       is true, or 3 otherwise.
+  
+  Args:
+    spark: SparkSession.
+    slide_nums: List of whole-slide numbers to process.
+    folder: Local directory in which the slides folder and ground truth
+      file is stored, as a string. This should contain a
+      `training_image_data` folder with images in the format
+      `TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file
+      containing the ground truth "tumor_score" and "molecular_score"
+      labels for each slide.  Alternatively, the folder should contain a
+      `testing_image_data` folder with images in the format
+      `TUPAC-TE-###.svs`.
+    training: Boolean for training or testing datasets.
+    tile_size: The width and height of a square tile to be generated.
+    overlap: Number of pixels by which to overlap the tiles.
+    tissue_threshold: Tissue percentage threshold for filtering.
+    sample_size: The new width and height of the square samples to be
+      generated.
+    grayscale: Whether or not to generate grayscale samples, rather
+      than RGB.
+    num_partitions: Number of partitions to use during processing.
+  
+  Returns:
+    A Spark DataFrame in which each row contains the slide number, tumor
+    score, molecular score, and the sample stretched out into a Vector.
+  """
+  slides = spark.sparkContext.parallelize(slide_nums)
+  # Create DataFrame of all tile locations and increase number of partitions
+  # to avoid OOM during subsequent processing.
+  tile_indices = (slides.flatMap(
+      lambda slide: process_slide(slide, folder, training, tile_size, overlap)))
+  # TODO: Explore computing the ideal paritition sizes based on projected number
+  #   of tiles after filtering.  I.e. something like the following:
+  #rows = tile_indices.count()
+  #part_size = 128
+  #channels = 1 if grayscale else 3
+  #row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024  # size of one row in MB
+  #rows_per_part = round(part_size / row_mb)
+  #num_parts = rows / rows_per_part
+  ## HACK: Force even partitioning by collecting and parallelizing -- for memory issues.
+  ## Note: This was a PySpark bug with a fix in the master branch now.
+  #tile_indices = tile_indices.collect()
+  #tile_indices = sc.parallelize(tile_indices, num_partitions)
+  ## END HACK
+  tile_indices = tile_indices.repartition(num_partitions)
+  tile_indices.cache()
+  # Extract all tiles into a DataFrame, filter, and cut into smaller samples.
+  tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
+  filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
+  samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
+  if training:
+    # Append labels
+    labels_df = get_labels_df(folder)
+    samples_with_labels = (samples.map(
+        lambda tup: (tup[0], int(labels_df.at[tup[0],"tumor_score"]),
+                     float(labels_df.at[tup[0],"molecular_score"]), Vectors.dense(tup[1]))))
+    df = samples_with_labels.toDF(["slide_num", "tumor_score", "molecular_score", "sample"])
+    df = df.select(df.slide_num.astype("int"), df.tumor_score.astype("int"),
+                   df.molecular_score, df["sample"])
+  else:  # testing data -- no labels
+    df = samples.toDF(["slide_num", "sample"])
+    df = df.select(df.slide_num.astype("int"), df["sample"])
+  #df = df.repartition(num_partitions)  # HACK: Even out the partitions to avoid saving issues
+  return df
+
+
+# Split Into Separate Train & Validation DataFrames Based On Slide Number
+
+def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indices=True, seed=None,
+                    debug=False):
+  """
+  Split a DataFrame of slide samples into training and validation sets.
+  
+  Args:
+    spark: SparkSession.
+    df: A Spark DataFrame in which each row contains the slide number,
+    tumor score, molecular score, and the sample stretched out into
+    a Vector.
+    slide_nums: A list of slide numbers to sample from.
+    folder: Directory containing a `training_ground_truth.csv` file
+      containing the ground truth "tumor_score" and "molecular_score"
+      labels for each slide.
+    train_frac: Fraction of the data to assign to the training set, with
+      `1-frac` assigned to the valiation set.
+    add_row_indices: Boolean for whether or not to prepend an index
+      column contain the row index for use downstream by SystemML.
+      The column name will be "__INDEX".
+    
+  Returns:
+    A Spark DataFrame in which each row contains the slide number, tumor
+    score, molecular score, and the sample stretched out into a Vector.
+  """
+  # Create DataFrame of labels for the given slide numbers.
+  labels_df = get_labels_df(folder)
+  labels_df = labels_df.loc[slide_nums]
+    
+  # Randomly split slides 80%/20% into train and validation sets.
+  train_nums_df = labels_df.sample(frac=train_frac, random_state=seed)
+  val_nums_df = labels_df.drop(train_nums_df.index)
+
+  train_nums = (spark.createDataFrame(train_nums_df)
+                     .selectExpr("cast(slide_num as int)")
+                     .coalesce(1))
+  val_nums = (spark.createDataFrame(val_nums_df)
+                   .selectExpr("cast(slide_num as int)")
+                   .coalesce(1))
+
+  # Note: Explicitly mark the smaller DataFrames as able to be broadcasted
+  # in order to have Catalyst choose the more efficient BroadcastHashJoin, 
+  # rather than the costly SortMergeJoin.
+  train = df.join(F.broadcast(train_nums), on="slide_num")
+  val = df.join(F.broadcast(val_nums), on="slide_num")
+  
+  if debug:
+    # DEBUG: Sanity checks.
+    assert len(pd.merge(train_nums_df, val_nums_df, on="slide_num")) == 0
+    assert train_nums.join(val_nums, on="slide_num").count() == 0
+    assert train.join(val, on="slide_num").count() == 0
+    #  - Check distributions.
+    for pdf in train_nums_df, val_nums_df:
+      print(pdf.count())
+      print(pdf["tumor_score"].value_counts(sort=False))
+      print(pdf["tumor_score"].value_counts(normalize=True, sort=False), "\n")
+    #  - Check total number of examples in each.
+    print(train.count(), val.count())
+    #  - Check physical plans for broadcast join.
+    print(train.explain(), val.explain())
+  
+  # Add row indices for use with SystemML.
+  if add_row_indices:
+    train = (train.rdd
+                  .zipWithIndex()
+                  .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing
+                  .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
+    train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), 
+                         train.tumor_score.astype("int"), train.molecular_score, train["sample"])
+
+    val = (val.rdd
+              .zipWithIndex()
+              .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing
+              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
+    val = val.select(val["__INDEX"].astype("int"), val.slide_num.astype("int"),
+                     val.tumor_score.astype("int"), val.molecular_score, val["sample"])
+
+  return train, val
+
+
+# Save DataFrame
+
+def save(df, filepath, sample_size, grayscale, mode="error", format="parquet", file_size=128):
+  """
+  Save a preprocessed DataFrame with a constraint on the file sizes.
+  
+  Args:
+    df: A Spark DataFrame.
+    filepath: Hadoop-supported path at which to save `df`.
+    sample_size: The width and height of the square samples.
+    grayscale: Whether or not to the samples are in grayscale format,
+      rather than RGB.
+    mode: Specifies the behavior of `df.write.mode` when the data
+      already exists.  Options include:
+        * `append`: Append contents of this DataFrame to
+          existing data.
+        * `overwrite`: Overwrite existing data.
+        * `error`: Throw an exception if data already exists.
+        * `ignore`: Silently ignore this operation if data already
+          exists.
+    format: The format in which to save the DataFrame.
+    file_size: Size in MB of each saved file.  128 MB is an
+      empirically ideal size.
+  """
+  channels = 1 if grayscale else 3
+  row_mb = sample_size * sample_size * channels * 8 / 1024 / 1024  # size of one row in MB
+  rows_per_file = round(file_size / row_mb)
+  df.write.option("maxRecordsPerFile", rows_per_file).mode(mode).save(filepath, format=format)
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e6555bbb/projects/breast_cancer/breastcancer/visualization.py
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/breastcancer/visualization.py b/projects/breast_cancer/breastcancer/visualization.py
new file mode 100644
index 0000000..6c578c9
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/visualization.py
@@ -0,0 +1,69 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+"""
+Visualization -- Predicting Breast Cancer Proliferation Scores with
+Apache SystemML
+
+This module contains functions for visualizing data for the breast
+cancer project.
+"""
+import matplotlib.pyplot as plt
+
+
+def visualize_tile(tile):
+  """
+  Plot a tissue tile.
+  
+  Args:
+    tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
+  
+  Returns:
+    None
+  """
+  plt.imshow(tile)
+  plt.show()
+
+
+def visualize_sample(sample, size=256):
+  """
+  Plot a tissue sample.
+  
+  Args:
+    sample: A square sample flattened to a vector of size
+      (channels*size_x*size_y).
+    size: The width and height of the square samples.
+  
+  Returns:
+    None
+  """
+  # Change type, reshape, transpose to (size_x, size_y, channels).
+  length = sample.shape[0]
+  channels = int(length / (size * size))
+  if channels > 1:
+    sample = sample.astype('uint8').reshape((channels, size, size)).transpose(1,2,0)
+    plt.imshow(sample)
+  else:
+    vmax = 255 if sample.max() > 1 else 1
+    sample = sample.reshape((size, size))
+    plt.imshow(sample, cmap="gray", vmin=0, vmax=vmax)
+  plt.show()
+

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e6555bbb/projects/breast_cancer/preprocess.py
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/preprocess.py b/projects/breast_cancer/preprocess.py
new file mode 100644
index 0000000..d789c97
--- /dev/null
+++ b/projects/breast_cancer/preprocess.py
@@ -0,0 +1,148 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+"""
+Preprocess -- Predicting Breast Cancer Proliferation Scores with
+Apache SystemML
+
+This script runs the preprocessing phase of the breast cancer project.
+"""
+import os
+import shutil
+
+import numpy as np
+
+from breastcancer.preprocessing import preprocess, save, train_val_split
+from pyspark.sql import SparkSession
+
+
+# Create new SparkSession
+spark = (SparkSession.builder
+                     .appName("Breast Cancer -- Preprocessing")
+                     .getOrCreate())
+
+# Ship a fresh copy of the `breastcancer` package to the Spark workers.
+# Note: The zip must include the `breastcancer` directory itself,
+# as well as all files within it for `addPyFile` to work correctly.
+# This is equivalent to `zip -r breastcancer.zip breastcancer`.
+dirname = "breastcancer"
+zipname = dirname + ".zip"
+shutil.make_archive(dirname, 'zip', dirname + "/..", dirname)
+spark.sparkContext.addPyFile(zipname)
+
+
+# Execute Preprocessing & Save
+
+# TODO: Filtering tiles and then cutting into samples could result
+# in samples with less tissue than desired, despite that being the
+# procedure of the paper.  Look into simply selecting tiles of the
+# desired size to begin with.
+
+# Get list of image numbers, minus the broken ones.
+broken = {2, 45, 91, 112, 242, 256, 280, 313, 329, 467}
+slide_nums = sorted(set(range(1,501)) - broken)
+
+# Settings
+training = True
+tile_size = 256
+sample_size = 256
+grayscale = False
+num_partitions = 20000
+add_row_indices = True
+train_frac = 0.8
+split_seed = 24
+folder = "/home/MDM/breast_cancer/data"
+save_folder = "data"  # Hadoop-supported directory in which to save DataFrames
+df_path = os.path.join(save_folder, "samples_{}_{}{}.parquet".format(
+    "labels" if training else "testing", sample_size, "_grayscale" if grayscale else ""))
+train_df_path = os.path.join(save_folder, "train_{}{}.parquet".format(sample_size,
+    "_grayscale" if grayscale else ""))
+val_df_path = os.path.join(save_folder, "val_{}{}.parquet".format(sample_size,
+    "_grayscale" if grayscale else ""))
+
+# Process all slides.
+df = preprocess(spark, slide_nums, tile_size=tile_size, sample_size=sample_size,
+                grayscale=grayscale, training=training, num_partitions=num_partitions,
+                folder=folder)
+
+# Save DataFrame of samples.
+save(df, df_path, sample_size, grayscale)
+
+# Load full DataFrame from disk.
+df = spark.read.load(df_path)
+
+# Split into train and validation DataFrames based On slide number
+train, val = train_val_split(spark, df, slide_nums, folder, train_frac, add_row_indices,
+                             seed=split_seed)
+
+# Save train and validation DataFrames.
+save(train, train_df_path, sample_size, grayscale)
+save(val, val_df_path, sample_size, grayscale)
+
+
+# ---
+#
+## Sample Data
+### TODO: Wrap this in a function with appropriate default arguments
+#
+## Load train and validation DataFrames from disk.
+#train = spark.read.load(train_df_path)
+#val = spark.read.load(val_df_path)
+#
+## Take a stratified sample.
+#p=0.01
+#train_sample = train.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
+#val_sample = val.drop("__INDEX").sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=42)
+#
+## Reassign row indices.
+## TODO: Wrap this in a function with appropriate default arguments.
+#train_sample = (
+#  train_sample.rdd
+#              .zipWithIndex()
+#              .map(lambda r: (r[1] + 1, *r[0]))
+#              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
+#train_sample = train_sample.select(train_sample["__INDEX"].astype("int"),
+#                                   train_sample.slide_num.astype("int"),
+#                                   train_sample.tumor_score.astype("int"),
+#                                   train_sample.molecular_score,
+#                                   train_sample["sample"])
+#
+#val_sample = (
+#  val_sample.rdd
+#            .zipWithIndex()
+#            .map(lambda r: (r[1] + 1, *r[0]))
+#            .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
+#val_sample = val_sample.select(val_sample["__INDEX"].astype("int"),
+#                               val_sample.slide_num.astype("int"),
+#                               val_sample.tumor_score.astype("int"),
+#                               val_sample.molecular_score,
+#                               val_sample["sample"])
+#
+## Save train and validation DataFrames.
+#tr_sample_filename = "train_{}_sample_{}{}.parquet".format(p, sample_size,
+#    "_grayscale" if grayscale else "")
+#val_sample_filename = "val_{}_sample_{}{}.parquet".format(p, sample_size,
+#    "_grayscale" if grayscale else "")
+#train_sample_path = os.path.join("save_folder", tr_sample_filename)
+#val_sample_path = os.path.join("save_folder", val_sample_filename)
+#save(train_sample, train_sample_path, sample_size, grayscale)
+#save(val_sample, val_sample_path, sample_size, grayscale)
+