You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/04/01 03:48:55 UTC

incubator-systemml git commit: [SYSTEMML-1185] Updating Preprocessing Notebook

Repository: incubator-systemml
Updated Branches:
  refs/heads/master a6d7aa549 -> 420dd17be

[SYSTEMML-1185] Updating Preprocessing Notebook

Updating the breast cancer preprocessing notebook with a new function
for splitting the full DataFrame into train and validation DataFrames.


Branch: refs/heads/master
Commit: 420dd17bee3adf5569cc848c7f0d62e3923bd769
Parents: a6d7aa5
Author: Mike Dusenberry <>
Authored: Fri Mar 31 20:47:35 2017 -0700
Committer: Mike Dusenberry <>
Committed: Fri Mar 31 20:47:35 2017 -0700

 projects/breast_cancer/Preprocessing.ipynb | 326 ++++++++++++++----------
 1 file changed, 187 insertions(+), 139 deletions(-)
diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb
index e5690a9..eb107cf 100644
--- a/projects/breast_cancer/Preprocessing.ipynb
+++ b/projects/breast_cancer/Preprocessing.ipynb
@@ -567,27 +567,55 @@
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false,
+    "collapsed": true,
     "deletable": true,
     "editable": true
    "outputs": [],
    "source": [
-    "def create_ground_truth_maps(folder):\n",
+    "def get_labels_df(folder):\n",
     "  \"\"\"\n",
-    "  Create lookup maps for ground truth labels.\n",
+    "  Create a DataFrame with the ground truth labels for each slide.\n",
     "  \n",
     "  Args:\n",
-    "    folder: Directory in which the slides folder is stored, as a string.\n",
-    "      This should contain a `training_ground_truth.csv` file.\n",
+    "    folder: Directory containing a `training_ground_truth.csv` file\n",
+    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+    "      labels for each slide.\n",
+    "\n",
+    "  Returns:\n",
+    "    A Pandas DataFrame containing the ground truth labels for each slide.\n",
     "  \"\"\"\n",
     "  filename = os.path.join(folder, \"training_ground_truth.csv\")\n",
     "  labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
     "  labels[\"slide_num\"] = range(1, 501)\n",
+    "  return labels"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
+   },
+   "outputs": [],
+   "source": [
+    "def create_ground_truth_maps(labels_df):\n",
+    "  \"\"\"\n",
+    "  Create lookup maps for ground truth labels.\n",
+    "  \n",
+    "  Args:\n",
+    "    labels_df: A Pandas DataFrame containing the ground truth labels for\n",
+    "      each slide.\n",
+    "  Returns:\n",
+    "    A tuple of dictionaries mapping from the slide number to the\n",
+    "    tumor score and to the molecular score.\n",
+    "  \"\"\"\n",
     "  # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries.\n",
-    "  tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n",
-    "  molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n",
+    "  tumor_score_dict = {int(s): int(l) for s,l in zip(labels_df.slide_num, labels_df.tumor_score)}\n",
+    "  molecular_score_dict = {int(s): float(l) for s,l in zip(labels_df.slide_num, labels_df.molecular_score)}\n",
     "  return tumor_score_dict, molecular_score_dict"
@@ -598,14 +626,14 @@
     "editable": true
    "source": [
-    "# Process All Slides Into A Saved Spark DataFrame"
+    "# Process All Slides Into A Spark DataFrame"
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
-    "collapsed": false,
+    "collapsed": true,
     "deletable": true,
     "editable": true
@@ -624,10 +652,13 @@
     "  \n",
     "  Args:\n",
     "    slide_nums: List of whole-slide numbers to process.\n",
-    "    folder: Local directory in which the slides folder is stored, as a string.\n",
-    "      This should contain either a `training_image_data` folder with\n",
-    "      images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
-    "      folder with images in the format `TUPAC-TE-###.svs`.\n",
+    "    folder: Local directory in which the slides folder and ground truth\n",
+    "      file is stored, as a string. This should contain a\n",
+    "      `training_image_data` folder with images in the format\n",
+    "      `TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file\n",
+    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+    "      labels for each slide.  Alternatively, the folder should contain a\n",
+    "      `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`.\n",
     "    training: Boolean for training or testing datasets.\n",
     "    tile_size: The width and height of a square tile to be generated.\n",
     "    overlap: Number of pixels by which to overlap the tiles.\n",
@@ -659,7 +690,8 @@
     "  filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n",
     "  samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n",
     "  if training:\n",
-    "    tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)\n",
+    "    labels_df = get_labels_df(folder)\n",
+    "    tumor_score_dict, molecular_score_dict = create_ground_truth_maps(labels_df)\n",
     "    samples_with_labels = (\n",
     "        lambda tup: (tup[0], tumor_score_dict[tup[0]],\n",
     "                     molecular_score_dict[tup[0]], Vectors.dense(tup[1]))))\n",
@@ -670,8 +702,145 @@
     "    df = samples.toDF([\"slide_num\", \"sample\"])\n",
     "    df =\"int\"), df[\"sample\"])\n",
     "  #df = df.repartition(num_partitions)  # Even out the partitions\n",
-    "  return df\n",
+    "  return df"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
+   "source": [
+    "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {
+    "collapsed": true,
+    "deletable": true,
+    "editable": true
+   },
+   "outputs": [],
+   "source": [
+    "def train_val_split(df, slide_nums, folder, add_row_indices):\n",
+    "  \"\"\"\n",
+    "  Save a preprocessed DataFrame with a constraint on the file sizes.\n",
+    "  \n",
+    "  Args:\n",
+    "    df: A DataFrame.\n",
+    "    slide_nums: A list of slide numbers to sample from.\n",
+    "    folder: Directory containing a `training_ground_truth.csv` file\n",
+    "      containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+    "      labels for each slide.\n",
+    "    add_row_indices: Boolean for whether or not to prepend an index\n",
+    "      column contain the row index for use downstream by SystemML.\n",
+    "      The column name will be \"__INDEX\".\n",
+    "    \n",
+    "    sample_size: The width and height of the square samples.\n",
+    "    grayscale: Whether or not to the samples are in grayscale format, rather\n",
+    "      than RGB.\n",
+    "    folder: HDFS directory in which to save the DataFrame.\n",
+    "    mode: Specifies the behavior of `df.write.mode` when the data already exists.\n",
+    "      Options include:\n",
+    "        * `append`: Append contents of this :class:`DataFrame` to existing data.\n",
+    "        * `overwrite`: Overwrite existing data.\n",
+    "        * `error`: Throw an exception if data already exists.\n",
+    "        * `ignore`: Silently ignore this operation if data already exists.\n",
+    "    format: The format in which to save the DataFrame.\n",
+    "    file_size: Size in MB of each saved file.  128 MB is an empirically ideal size.\n",
+    "    \n",
+    "    Returns:\n",
+    "      A DataFrame in which each row contains the slide number, tumor score,\n",
+    "      molecular score, and the sample stretched out into a Vector.\n",
+    "  \"\"\"\n",
+    "  # Create DataFrame of labels.\n",
+    "  labels_df = get_labels_df(folder)\n",
+    "\n",
+    "  # Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n",
+    "  # and merge with labels.\n",
+    "  slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n",
+    "  labeled_slide_nums_df = pd.merge(slide_nums_df, labels_df, how=\"inner\", on=\"slide_num\")\n",
+    "\n",
+    "  # DEBUG: Examine class distribution.\n",
+    "#   for pdf in [labels_df, labeled_slide_nums_df]:\n",
+    "#     print(pdf.count())\n",
+    "#     print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+    "#     print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
+    "#     print()\n",
+    "    \n",
+    "  # Randomly split slides 80%/20% into train and validation sets.\n",
+    "  train_nums_df = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n",
+    "  val_nums_df = labeled_slide_nums_df.drop(train_nums_df.index)\n",
+    "\n",
+    "  train_nums = (spark.createDataFrame(train_nums_df)\n",
+    "                     .selectExpr(\"cast(slide_num as int)\")\n",
+    "                     .coalesce(1))\n",
+    "  val_nums = (spark.createDataFrame(val_nums_df)\n",
+    "                   .selectExpr(\"cast(slide_num as int)\")\n",
+    "                   .coalesce(1))\n",
+    "  # Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n",
+    "  # in order to have Catalyst choose the more efficient BroadcastHashJoin, \n",
+    "  # rather than the costly SortMergeJoin.\n",
+    "  train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
+    "  val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
+    "  \n",
+    "  # DEBUG: Sanity checks.\n",
+    "#   assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n",
+    "#   assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
+    "#   assert train.join(val, on=\"slide_num\").count() == 0\n",
+    "#   #  - Check distributions.\n",
+    "#   for pdf in train_nums_df, val_nums_df:\n",
+    "#     print(pdf.count())\n",
+    "#     print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+    "#     print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n",
+    "#   #  - Check total number of examples in each.\n",
+    "#   print(train.count(), val.count())\n",
+    "#   #  - Check physical plans for broadcast join.\n",
+    "#   print(train.explain(), val.explain())\n",
+    "  \n",
+    "  # Add row indices for use with SystemML.\n",
+    "  if add_row_indices:\n",
+    "    train = (train.rdd\n",
+    "                  .zipWithIndex()\n",
+    "                  .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
+    "                  .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
+    "    train =[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n",
+    "                         train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n",
+    "\n",
+    "    val = (val.rdd\n",
+    "              .zipWithIndex()\n",
+    "              .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
+    "              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
+    "    val =[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n",
+    "                     val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n",
+    "\n",
+    "  return train, val"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {
+    "deletable": true,
+    "editable": true
+   },
+   "source": [
+    "# Save"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {
+    "collapsed": false,
+    "deletable": true,
+    "editable": true
+   },
+   "outputs": [],
+   "source": [
     "def save(df, filename, sample_size=256, grayscale=False, folder=\"data\",\n",
     "         mode=\"error\", format=\"parquet\", file_size=128):\n",
     "  \"\"\"\n",
@@ -756,6 +925,7 @@
     "sample_size = 256\n",
     "grayscale = False\n",
     "num_partitions = 20000\n",
+    "add_row_indices = True\n",
     "folder = \"/home/MDM/breast_cancer/data\"\n",
     "filename = \"samples_{}_{}{}.parquet\".format(\n",
     "    \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\")\n",
@@ -794,26 +964,6 @@
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "metadata": {
-    "deletable": true,
-    "editable": true
-   },
-   "source": [
-    "### TODO: Wrap this in a function with appropriate default arguments"
-   ]
-  },
-  {
    "cell_type": "code",
    "execution_count": null,
    "metadata": {
@@ -834,114 +984,12 @@
    "metadata": {
     "collapsed": false,
     "deletable": true,
-    "editable": true,
-    "scrolled": false
-   },
-   "outputs": [],
-   "source": [
-    "# Create DataFrame of labels.\n",
-    "labels = pd.read_csv(\n",
-    "    \"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
-    "labels[\"slide_num\"] = range(1, 501)  # add slide num column\n",
-    "\n",
-    "# Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n",
-    "# and merge with labels.\n",
-    "slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n",
-    "labeled_slide_nums_df = pd.merge(slide_nums_df, labels, how=\"inner\", on=\"slide_num\")\n",
-    "\n",
-    "# Examine class distribution.\n",
-    "for pdf in [labels, labeled_slide_nums_df]:\n",
-    "  print(pdf.count())\n",
-    "  print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
-    "  print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
-    "  print()"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "# Randomly split slides 80%/20% into train and validation sets.\n",
-    "train_nums_pdf = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n",
-    "val_nums_pdf = labeled_slide_nums_df.drop(train_nums_pdf.index)\n",
-    "\n",
-    "train_nums = (spark.createDataFrame(train_nums_pdf)\n",
-    "                   .selectExpr(\"cast(slide_num as int)\")\n",
-    "                   .coalesce(1))\n",
-    "val_nums = (spark.createDataFrame(val_nums_pdf)\n",
-    "                 .selectExpr(\"cast(slide_num as int)\")\n",
-    "                 .coalesce(1))\n",
-    "\n",
-    "# Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n",
-    "# in order to have Catalyst choose the more efficient BroadcastHashJoin, \n",
-    "# rather than the costly SortMergeJoin.\n",
-    "train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
-    "val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
-    "\n",
-    "train, val"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
-    "editable": true
-   },
-   "outputs": [],
-   "source": [
-    "# Sanity checks.\n",
-    "assert len(pd.merge(train_nums_pdf, val_nums_pdf, on=\"slide_num\")) == 0\n",
-    "assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
-    "assert train.join(val, on=\"slide_num\").count() == 0\n",
-    "\n",
-    "# Check distributions.\n",
-    "for pdf in train_nums_pdf, val_nums_pdf:\n",
-    "  print(pdf.count())\n",
-    "  print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
-    "  print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n",
-    "\n",
-    "# Check total number of examples in each.\n",
-    "print(train.count(), val.count())\n",
-    "\n",
-    "# Check physical plans for broadcast join.\n",
-    "print(train.explain(), val.explain())"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {
-    "collapsed": false,
-    "deletable": true,
     "editable": true
    "outputs": [],
    "source": [
-    "# Add row indices for use with SystemML.\n",
-    "# TODO: Wrap this in a function with appropriate default arguments.\n",
-    "train = (train.rdd\n",
-    "              .zipWithIndex()\n",
-    "              .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
-    "              .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
-    "train =[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n",
-    "                     train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n",
-    "\n",
-    "val = (val.rdd\n",
-    "          .zipWithIndex()\n",
-    "          .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing\n",
-    "          .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
-    "val =[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n",
-    "                 val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n",
-    "\n",
-    "train, val"
+    "# Split into train and validation DataFrames based On slide number\n",
+    "train, val = train_val_split(df, slide_nums, folder, add_row_indices)"