You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2017/04/01 03:48:55 UTC
incubator-systemml git commit: [SYSTEMML-1185] Updating Preprocessing
Notebook
Repository: incubator-systemml
Updated Branches:
refs/heads/master a6d7aa549 -> 420dd17be
[SYSTEMML-1185] Updating Preprocessing Notebook
Updating the breast cancer preprocessing notebook with a new function
for splitting the full DataFrame into train and validation DataFrames.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/420dd17b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/420dd17b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/420dd17b
Branch: refs/heads/master
Commit: 420dd17bee3adf5569cc848c7f0d62e3923bd769
Parents: a6d7aa5
Author: Mike Dusenberry <mw...@us.ibm.com>
Authored: Fri Mar 31 20:47:35 2017 -0700
Committer: Mike Dusenberry <mw...@us.ibm.com>
Committed: Fri Mar 31 20:47:35 2017 -0700
----------------------------------------------------------------------
projects/breast_cancer/Preprocessing.ipynb | 326 ++++++++++++++----------
1 file changed, 187 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/420dd17b/projects/breast_cancer/Preprocessing.ipynb
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/Preprocessing.ipynb b/projects/breast_cancer/Preprocessing.ipynb
index e5690a9..eb107cf 100644
--- a/projects/breast_cancer/Preprocessing.ipynb
+++ b/projects/breast_cancer/Preprocessing.ipynb
@@ -567,27 +567,55 @@
"cell_type": "code",
"execution_count": null,
"metadata": {
- "collapsed": false,
+ "collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
- "def create_ground_truth_maps(folder):\n",
+ "def get_labels_df(folder):\n",
" \"\"\"\n",
- " Create lookup maps for ground truth labels.\n",
+ " Create a DataFrame with the ground truth labels for each slide.\n",
" \n",
" Args:\n",
- " folder: Directory in which the slides folder is stored, as a string.\n",
- " This should contain a `training_ground_truth.csv` file.\n",
+ " folder: Directory containing a `training_ground_truth.csv` file\n",
+ " containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+ " labels for each slide.\n",
+ "\n",
+ " Returns:\n",
+ " A Pandas DataFrame containing the ground truth labels for each slide.\n",
" \"\"\"\n",
" filename = os.path.join(folder, \"training_ground_truth.csv\")\n",
" labels = pd.read_csv(filename, names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
" labels[\"slide_num\"] = range(1, 501)\n",
+ " return labels"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "collapsed": false,
+ "deletable": true,
+ "editable": true
+ },
+ "outputs": [],
+ "source": [
+ "def create_ground_truth_maps(labels_df):\n",
+ " \"\"\"\n",
+ " Create lookup maps for ground truth labels.\n",
+ " \n",
+ " Args:\n",
+ " labels_df: A Pandas DataFrame containing the ground truth labels for\n",
+ " each slide.\n",
"\n",
+ " Returns:\n",
+ " A tuple of dictionaries mapping from the slide number to the\n",
+ " tumor score and to the molecular score.\n",
+ " \"\"\"\n",
" # Create slide_num -> tumor_score, and slide_num -> molecular_score dictionaries.\n",
- " tumor_score_dict = {int(s): int(l) for s,l in zip(labels.slide_num, labels.tumor_score)}\n",
- " molecular_score_dict = {int(s): float(l) for s,l in zip(labels.slide_num, labels.molecular_score)}\n",
+ " tumor_score_dict = {int(s): int(l) for s,l in zip(labels_df.slide_num, labels_df.tumor_score)}\n",
+ " molecular_score_dict = {int(s): float(l) for s,l in zip(labels_df.slide_num, labels_df.molecular_score)}\n",
" return tumor_score_dict, molecular_score_dict"
]
},
@@ -598,14 +626,14 @@
"editable": true
},
"source": [
- "# Process All Slides Into A Saved Spark DataFrame"
+ "# Process All Slides Into A Spark DataFrame"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
- "collapsed": false,
+ "collapsed": true,
"deletable": true,
"editable": true
},
@@ -624,10 +652,13 @@
" \n",
" Args:\n",
" slide_nums: List of whole-slide numbers to process.\n",
- " folder: Local directory in which the slides folder is stored, as a string.\n",
- " This should contain either a `training_image_data` folder with\n",
- " images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`\n",
- " folder with images in the format `TUPAC-TE-###.svs`.\n",
+ " folder: Local directory in which the slides folder and ground truth\n",
+ " file is stored, as a string. This should contain a\n",
+ " `training_image_data` folder with images in the format\n",
+ " `TUPAC-TR-###.svs`, as well as a `training_ground_truth.csv` file\n",
+ " containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+ " labels for each slide. Alternatively, the folder should contain a\n",
+ " `testing_image_data` folder with images in the format `TUPAC-TE-###.svs`.\n",
" training: Boolean for training or testing datasets.\n",
" tile_size: The width and height of a square tile to be generated.\n",
" overlap: Number of pixels by which to overlap the tiles.\n",
@@ -659,7 +690,8 @@
" filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))\n",
" samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))\n",
" if training:\n",
- " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(folder)\n",
+ " labels_df = get_labels_df(folder)\n",
+ " tumor_score_dict, molecular_score_dict = create_ground_truth_maps(labels_df)\n",
" samples_with_labels = (samples.map(\n",
" lambda tup: (tup[0], tumor_score_dict[tup[0]],\n",
" molecular_score_dict[tup[0]], Vectors.dense(tup[1]))))\n",
@@ -670,8 +702,145 @@
" df = samples.toDF([\"slide_num\", \"sample\"])\n",
" df = df.select(df.slide_num.astype(\"int\"), df[\"sample\"])\n",
" #df = df.repartition(num_partitions) # Even out the partitions\n",
- " return df\n",
+ " return df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "deletable": true,
+ "editable": true
+ },
+ "source": [
+ "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "collapsed": true,
+ "deletable": true,
+ "editable": true
+ },
+ "outputs": [],
+ "source": [
+ "def train_val_split(df, slide_nums, folder, add_row_indices):\n",
+ " \"\"\"\n",
+ " Save a preprocessed DataFrame with a constraint on the file sizes.\n",
+ " \n",
+ " Args:\n",
+ " df: A DataFrame.\n",
+ " slide_nums: A list of slide numbers to sample from.\n",
+ " folder: Directory containing a `training_ground_truth.csv` file\n",
+ " containing the ground truth \"tumor_score\" and \"molecular_score\"\n",
+ " labels for each slide.\n",
+ " add_row_indices: Boolean for whether or not to prepend an index\n",
+ " column contain the row index for use downstream by SystemML.\n",
+ " The column name will be \"__INDEX\".\n",
+ " \n",
+ " sample_size: The width and height of the square samples.\n",
+ " grayscale: Whether or not to the samples are in grayscale format, rather\n",
+ " than RGB.\n",
+ " folder: HDFS directory in which to save the DataFrame.\n",
+ " mode: Specifies the behavior of `df.write.mode` when the data already exists.\n",
+ " Options include:\n",
+ " * `append`: Append contents of this :class:`DataFrame` to existing data.\n",
+ " * `overwrite`: Overwrite existing data.\n",
+ " * `error`: Throw an exception if data already exists.\n",
+ " * `ignore`: Silently ignore this operation if data already exists.\n",
+ " format: The format in which to save the DataFrame.\n",
+ " file_size: Size in MB of each saved file. 128 MB is an empirically ideal size.\n",
+ " \n",
+ " Returns:\n",
+ " A DataFrame in which each row contains the slide number, tumor score,\n",
+ " molecular score, and the sample stretched out into a Vector.\n",
+ " \"\"\"\n",
+ " # Create DataFrame of labels.\n",
+ " labels_df = get_labels_df(folder)\n",
+ "\n",
+ " # Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n",
+ " # and merge with labels.\n",
+ " slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n",
+ " labeled_slide_nums_df = pd.merge(slide_nums_df, labels_df, how=\"inner\", on=\"slide_num\")\n",
+ "\n",
+ " # DEBUG: Examine class distribution.\n",
+ "# for pdf in [labels_df, labeled_slide_nums_df]:\n",
+ "# print(pdf.count())\n",
+ "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+ "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
+ "# print()\n",
+ " \n",
+ " # Randomly split slides 80%/20% into train and validation sets.\n",
+ " train_nums_df = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n",
+ " val_nums_df = labeled_slide_nums_df.drop(train_nums_df.index)\n",
+ "\n",
+ " train_nums = (spark.createDataFrame(train_nums_df)\n",
+ " .selectExpr(\"cast(slide_num as int)\")\n",
+ " .coalesce(1))\n",
+ " val_nums = (spark.createDataFrame(val_nums_df)\n",
+ " .selectExpr(\"cast(slide_num as int)\")\n",
+ " .coalesce(1))\n",
"\n",
+ " # Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n",
+ " # in order to have Catalyst choose the more efficient BroadcastHashJoin, \n",
+ " # rather than the costly SortMergeJoin.\n",
+ " train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
+ " val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
+ " \n",
+ " # DEBUG: Sanity checks.\n",
+ "# assert len(pd.merge(train_nums_df, val_nums_df, on=\"slide_num\")) == 0\n",
+ "# assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
+ "# assert train.join(val, on=\"slide_num\").count() == 0\n",
+ "# # - Check distributions.\n",
+ "# for pdf in train_nums_df, val_nums_df:\n",
+ "# print(pdf.count())\n",
+ "# print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
+ "# print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n",
+ "# # - Check total number of examples in each.\n",
+ "# print(train.count(), val.count())\n",
+ "# # - Check physical plans for broadcast join.\n",
+ "# print(train.explain(), val.explain())\n",
+ " \n",
+ " # Add row indices for use with SystemML.\n",
+ " if add_row_indices:\n",
+ " train = (train.rdd\n",
+ " .zipWithIndex()\n",
+ " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n",
+ " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
+ " train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n",
+ " train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n",
+ "\n",
+ " val = (val.rdd\n",
+ " .zipWithIndex()\n",
+ " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n",
+ " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
+ " val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n",
+ " val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n",
+ "\n",
+ " return train, val"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "deletable": true,
+ "editable": true
+ },
+ "source": [
+ "# Save"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "collapsed": false,
+ "deletable": true,
+ "editable": true
+ },
+ "outputs": [],
+ "source": [
"def save(df, filename, sample_size=256, grayscale=False, folder=\"data\",\n",
" mode=\"error\", format=\"parquet\", file_size=128):\n",
" \"\"\"\n",
@@ -756,6 +925,7 @@
"sample_size = 256\n",
"grayscale = False\n",
"num_partitions = 20000\n",
+ "add_row_indices = True\n",
"folder = \"/home/MDM/breast_cancer/data\"\n",
"filename = \"samples_{}_{}{}.parquet\".format(\n",
" \"labels\" if training else \"testing\", sample_size, \"_grayscale\" if grayscale else \"\")\n",
@@ -794,26 +964,6 @@
]
},
{
- "cell_type": "markdown",
- "metadata": {
- "deletable": true,
- "editable": true
- },
- "source": [
- "# Split Into Separate Train & Validation DataFrames Based On Slide Number"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {
- "deletable": true,
- "editable": true
- },
- "source": [
- "### TODO: Wrap this in a function with appropriate default arguments"
- ]
- },
- {
"cell_type": "code",
"execution_count": null,
"metadata": {
@@ -834,114 +984,12 @@
"metadata": {
"collapsed": false,
"deletable": true,
- "editable": true,
- "scrolled": false
- },
- "outputs": [],
- "source": [
- "# Create DataFrame of labels.\n",
- "labels = pd.read_csv(\n",
- " \"data/training_ground_truth.csv\", names=[\"tumor_score\",\"molecular_score\"], header=None)\n",
- "labels[\"slide_num\"] = range(1, 501) # add slide num column\n",
- "\n",
- "# Create DataFrames of the slide numbers being used (i.e. without the broken ones)\n",
- "# and merge with labels.\n",
- "slide_nums_df = pd.DataFrame(slide_nums, columns=[\"slide_num\"])\n",
- "labeled_slide_nums_df = pd.merge(slide_nums_df, labels, how=\"inner\", on=\"slide_num\")\n",
- "\n",
- "# Examine class distribution.\n",
- "for pdf in [labels, labeled_slide_nums_df]:\n",
- " print(pdf.count())\n",
- " print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
- " print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False))\n",
- " print()"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "deletable": true,
- "editable": true
- },
- "outputs": [],
- "source": [
- "# Randomly split slides 80%/20% into train and validation sets.\n",
- "train_nums_pdf = labeled_slide_nums_df.sample(frac=0.8, random_state=24)\n",
- "val_nums_pdf = labeled_slide_nums_df.drop(train_nums_pdf.index)\n",
- "\n",
- "train_nums = (spark.createDataFrame(train_nums_pdf)\n",
- " .selectExpr(\"cast(slide_num as int)\")\n",
- " .coalesce(1))\n",
- "val_nums = (spark.createDataFrame(val_nums_pdf)\n",
- " .selectExpr(\"cast(slide_num as int)\")\n",
- " .coalesce(1))\n",
- "\n",
- "# Note: Explicitly mark the smaller DataFrames as able to be broadcasted\n",
- "# in order to have Catalyst choose the more efficient BroadcastHashJoin, \n",
- "# rather than the costly SortMergeJoin.\n",
- "train = df.join(F.broadcast(train_nums), on=\"slide_num\")\n",
- "val = df.join(F.broadcast(val_nums), on=\"slide_num\")\n",
- "\n",
- "train, val"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "deletable": true,
- "editable": true
- },
- "outputs": [],
- "source": [
- "# Sanity checks.\n",
- "assert len(pd.merge(train_nums_pdf, val_nums_pdf, on=\"slide_num\")) == 0\n",
- "assert train_nums.join(val_nums, on=\"slide_num\").count() == 0\n",
- "assert train.join(val, on=\"slide_num\").count() == 0\n",
- "\n",
- "# Check distributions.\n",
- "for pdf in train_nums_pdf, val_nums_pdf:\n",
- " print(pdf.count())\n",
- " print(pdf[\"tumor_score\"].value_counts(sort=False))\n",
- " print(pdf[\"tumor_score\"].value_counts(normalize=True, sort=False), \"\\n\")\n",
- "\n",
- "# Check total number of examples in each.\n",
- "print(train.count(), val.count())\n",
- "\n",
- "# Check physical plans for broadcast join.\n",
- "print(train.explain(), val.explain())"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {
- "collapsed": false,
- "deletable": true,
"editable": true
},
"outputs": [],
"source": [
- "# Add row indices for use with SystemML.\n",
- "# TODO: Wrap this in a function with appropriate default arguments.\n",
- "train = (train.rdd\n",
- " .zipWithIndex()\n",
- " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n",
- " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
- "train = train.select(train[\"__INDEX\"].astype(\"int\"), train.slide_num.astype(\"int\"), \n",
- " train.tumor_score.astype(\"int\"), train.molecular_score, train[\"sample\"])\n",
- "\n",
- "val = (val.rdd\n",
- " .zipWithIndex()\n",
- " .map(lambda r: (r[1] + 1, *r[0])) # flatten & convert index to 1-based indexing\n",
- " .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))\n",
- "val = val.select(val[\"__INDEX\"].astype(\"int\"), val.slide_num.astype(\"int\"),\n",
- " val.tumor_score.astype(\"int\"), val.molecular_score, val[\"sample\"])\n",
- "\n",
- "train, val"
+ "# Split into train and validation DataFrames based On slide number\n",
+ "train, val = train_val_split(df, slide_nums, folder, add_row_indices)"
]
},
{