You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by du...@apache.org on 2017/05/22 23:41:50 UTC

incubator-systemml git commit: [SYSTEMML-1465] Add stain normalization to breast cancer preprocessing.

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 3d1f77ce2 -> e93fdd35b


[SYSTEMML-1465] Add stain normalization to breast cancer preprocessing.

Adding stain normalization of H&E histology slides for the breast cancer
project.

Closes #507.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e93fdd35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e93fdd35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e93fdd35

Branch: refs/heads/master
Commit: e93fdd35b82e95fd41023850939593d5d37c8512
Parents: 3d1f77c
Author: Mike Dusenberry <mw...@us.ibm.com>
Authored: Mon May 22 16:39:09 2017 -0700
Committer: Mike Dusenberry <mw...@us.ibm.com>
Committed: Mon May 22 16:39:09 2017 -0700

----------------------------------------------------------------------
 .../breast_cancer/breastcancer/preprocessing.py | 261 ++++++++++++++-----
 1 file changed, 200 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e93fdd35/projects/breast_cancer/breastcancer/preprocessing.py
----------------------------------------------------------------------
diff --git a/projects/breast_cancer/breastcancer/preprocessing.py b/projects/breast_cancer/breastcancer/preprocessing.py
index 424d699..cfedf94 100644
--- a/projects/breast_cancer/breastcancer/preprocessing.py
+++ b/projects/breast_cancer/breastcancer/preprocessing.py
@@ -47,7 +47,7 @@ from skimage.morphology import binary_closing, binary_dilation, disk
 def open_slide(slide_num, folder, training):
   """
   Open a whole-slide image, given an image number.
-  
+
   Args:
     slide_num: Slide image number as an integer.
     folder: Directory in which the slides folder is stored, as a string.
@@ -55,7 +55,7 @@ def open_slide(slide_num, folder, training):
       images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
       folder with images in the format `TUPAC-TE-###.svs`.
     training: Boolean for training or testing datasets.
-  
+
   Returns:
     An OpenSlide object representing a whole-slide image.
   """
@@ -75,15 +75,15 @@ def open_slide(slide_num, folder, training):
 def create_tile_generator(slide, tile_size, overlap):
   """
   Create a tile generator for the given slide.
-  
+
   This generator is able to extract tiles from the overall
   whole-slide image.
-  
+
   Args:
     slide: An OpenSlide object representing a whole-slide image.
     tile_size: The width and height of a square tile to be generated.
     overlap: Number of pixels by which to overlap the tiles.
-  
+
   Returns:
     A DeepZoomGenerator object representing the tile generator. Each
     extracted tile is a PIL Image with shape
@@ -100,18 +100,18 @@ def create_tile_generator(slide, tile_size, overlap):
 def get_20x_zoom_level(slide, generator):
   """
   Return the zoom level that corresponds to a 20x magnification.
-  
+
   The generator can extract tiles from multiple zoom levels,
   downsampling by a factor of 2 per level from highest to lowest
   resolution.
-  
+
   Args:
     slide: An OpenSlide object representing a whole-slide image.
     generator: A DeepZoomGenerator object representing a tile generator.
       Note: This generator is not a true "Python generator function",
       but rather is an object that is capable of extracting individual
       tiles.
-  
+
   Returns:
     Zoom level corresponding to a 20x magnification, or as close as
     possible.
@@ -138,11 +138,11 @@ def get_20x_zoom_level(slide, generator):
 def process_slide(slide_num, folder, training, tile_size, overlap):
   """
   Generate all possible tile indices for a whole-slide image.
-  
+
   Given a slide number, tile size, and overlap, generate
   all possible (slide_num, tile_size, overlap, zoom_level, col, row)
   indices.
-  
+
   Args:
     slide_num: Slide image number as an integer.
     folder: Directory in which the slides folder is stored, as a string.
@@ -152,7 +152,7 @@ def process_slide(slide_num, folder, training, tile_size, overlap):
     training: Boolean for training or testing datasets.
     tile_size: The width and height of a square tile to be generated.
     overlap: Number of pixels by which to overlap the tiles.
-  
+
   Returns:
     A list of (slide_num, tile_size, overlap, zoom_level, col, row)
     integer index tuples representing possible tiles to extract.
@@ -175,10 +175,10 @@ def process_slide(slide_num, folder, training, tile_size, overlap):
 def process_tile_index(tile_index, folder, training):
   """
   Generate a tile from a tile index.
-  
+
   Given a (slide_num, tile_size, overlap, zoom_level, col, row) tile
   index, generate a (slide_num, tile) tuple.
-  
+
   Args:
     tile_index: A (slide_num, tile_size, overlap, zoom_level, col, row)
       integer index tuple representing a tile to extract.
@@ -187,7 +187,7 @@ def process_tile_index(tile_index, folder, training):
       images in the format `TUPAC-TR-###.svs`, or a `testing_image_data`
       folder with images in the format `TUPAC-TE-###.svs`.
     training: Boolean for training or testing datasets.
-  
+
   Returns:
     A (slide_num, tile) tuple, where slide_num is an integer, and tile
     is a 3D NumPy array of shape (tile_size, tile_size, channels) in
@@ -199,7 +199,7 @@ def process_tile_index(tile_index, folder, training):
   # Create tile generator.
   generator = create_tile_generator(slide, tile_size, overlap)
   # Generate tile.
-  tile = np.array(generator.get_tile(zoom_level, (col, row)))
+  tile = np.asarray(generator.get_tile(zoom_level, (col, row)))
   return (slide_num, tile)
 
 
@@ -208,10 +208,10 @@ def process_tile_index(tile_index, folder, training):
 def optical_density(tile):
   """
   Convert a tile to optical density values.
-  
+
   Args:
     tile: A 3D NumPy array of shape (tile_size, tile_size, channels).
-  
+
   Returns:
     A 3D NumPy array of shape (tile_size, tile_size, channels)
     representing optical density values.
@@ -225,20 +225,20 @@ def optical_density(tile):
 def keep_tile(tile_tuple, tile_size, tissue_threshold):
   """
   Determine if a tile should be kept.
-  
+
   This filters out tiles based on size and a tissue percentage
   threshold, using a custom algorithm. If a tile has height &
   width equal to (tile_size, tile_size), and contains greater
   than or equal to the given percentage, then it will be kept;
   otherwise it will be filtered out.
-  
+
   Args:
     tile_tuple: A (slide_num, tile) tuple, where slide_num is an
-      integer, and tile is a 3D NumPy array of shape 
+      integer, and tile is a 3D NumPy array of shape
       (tile_size, tile_size, channels) in RGB format.
     tile_size: The width and height of a square tile to be generated.
     tissue_threshold: Tissue percentage threshold.
-  
+
   Returns:
     A Boolean indicating whether or not a tile should be kept for
     future usage.
@@ -246,7 +246,7 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold):
   slide_num, tile = tile_tuple
   if tile.shape[0:2] == (tile_size, tile_size):
     tile_orig = tile
-    
+
     # Check 1
     # Convert 3D RGB image to 2D grayscale image, from
     # 0 (dense tissue) to 1 (plain background).
@@ -272,7 +272,7 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold):
     # Calculate percentage of tissue coverage.
     percentage = tile.mean()
     check1 = percentage >= tissue_threshold
-    
+
     # Check 2
     # Convert to optical density values
     tile = optical_density(tile_orig)
@@ -285,39 +285,36 @@ def keep_tile(tile_tuple, tile_size, tissue_threshold):
     tile = binary_fill_holes(tile)
     percentage = tile.mean()
     check2 = percentage >= tissue_threshold
-    
+
     return check1 and check2
   else:
     return False
 
 
-# Generate Flattened Samples From Tile
+# Generate Samples From Tile
 
 def process_tile(tile_tuple, sample_size, grayscale):
   """
   Process a tile into a group of smaller samples.
-  
+
   Cut up a tile into smaller blocks of sample_size x sample_size pixels,
-  change the shape of each sample from (H, W, channels) to 
+  change the shape of each sample from (H, W, channels) to
   (channels, H, W), then flatten each into a vector of length
   channels*H*W.
-  
+
   Args:
     tile_tuple: A (slide_num, tile) tuple, where slide_num is an
-      integer, and tile is a 3D NumPy array of shape 
+      integer, and tile is a 3D NumPy array of shape
       (tile_size, tile_size, channels).
     sample_size: The new width and height of the square samples to be
       generated.
     grayscale: Whether or not to generate grayscale samples, rather
       than RGB.
-  
+
   Returns:
     A list of (slide_num, sample) tuples representing cut up tiles,
-    where each sample has been transposed from
-    (sample_size_x, sample_size_y, channels) to
-    (channels, sample_size_x, sample_size_y),
-    and flattened to a vector of length
-    (channels*sample_size_x*sample_size_y).
+    where each sample is a 3D NumPy array of shape
+    (sample_size_x, sample_size_y, channels).
   """
   slide_num, tile = tile_tuple
   if grayscale:
@@ -332,24 +329,166 @@ def process_tile(tile_tuple, sample_size, grayscale):
   # (num_x, num_y, sample_size_x, sample_size_y, ch).
   # 3. Combine num_x and num_y into single axis, returning
   # (num_samples, sample_size_x, sample_size_y, ch).
-  # 4. Swap axes from (num_samples, sample_size_x, sample_size_y, ch) to
-  # (num_samples, ch, sample_size_x, sample_size_y).
-  # 5. Flatten samples into (num_samples, ch*sample_size_x*sample_size_y).
   samples = (tile.reshape((x // sample_size, sample_size, y // sample_size, sample_size, ch))
                  .swapaxes(1,2)
-                 .reshape((-1, sample_size, sample_size, ch))
-                 .transpose(0,3,1,2))
-  samples = samples.reshape(samples.shape[0], -1)
+                 .reshape((-1, sample_size, sample_size, ch)))
   samples = [(slide_num, sample) for sample in list(samples)]
   return samples
 
 
+# Normalize staining
+
+def normalize_staining(sample_tuple, beta=0.15, alpha=1, light_intensity=255):
+  """
+  Normalize the staining of H&E histology slides.
+
+  This function normalizes the staining of H&E histology slides.
+
+  References:
+    - Macenko, Marc, et al. "A method for normalizing histology slides
+    for quantitative analysis." Biomedical Imaging: From Nano to Macro,
+    2009.  ISBI'09. IEEE International Symposium on. IEEE, 2009.
+      - http://wwwx.cs.unc.edu/~mn/sites/default/files/macenko2009.pdf
+    - https://github.com/mitkovetta/staining-normalization
+
+  Args:
+    sample_tuple: A (slide_num, sample) tuple, where slide_num is an
+      integer, and sample is a 3D NumPy array of shape (H,W,C).
+
+  Returns:
+    A (slide_num, sample) tuple, where the sample is a 3D NumPy array
+    of shape (H,W,C) that has been stain normalized.
+  """
+  # Setup.
+  slide_num, sample = sample_tuple
+  x = np.asarray(sample)
+  h, w, c = x.shape
+  x = x.reshape(-1, c).astype(np.float64)  # shape (H*W, C)
+
+  # Reference stain vectors and stain saturations.  We will normalize all slides
+  # to these references.  To create these, grab the stain vectors and stain
+  # saturations from a desirable slide.
+
+  # Values in reference implementation for use with eigendecomposition approach, natural log,
+  # and `light_intensity=240`.
+  #stain_ref = np.array([0.5626, 0.2159, 0.7201, 0.8012, 0.4062, 0.5581]).reshape(3,2)
+  #max_sat_ref = np.array([1.9705, 1.0308]).reshape(2,1)
+
+  # SVD w/ log10, and `light_intensity=255`.
+  stain_ref = (np.array([0.54598845, 0.322116, 0.72385198, 0.76419107, 0.42182333, 0.55879629])
+                 .reshape(3,2))
+  max_sat_ref = np.array([0.82791151, 0.61137274]).reshape(2,1)
+
+  # Convert RGB to OD.
+  # Note: The original paper used log10, and the reference implementation used the natural log.
+  #OD = -np.log((x+1)/light_intensity)  # shape (H*W, C)
+  OD = -np.log10(x/light_intensity + 1e-8)
+
+  # Remove data with OD intensity less than beta.
+  # I.e. remove transparent pixels.
+  # Note: This needs to be checked per channel, rather than
+  # taking an average over all channels for a given pixel.
+  OD_thresh = OD[np.all(OD >= beta, 1), :]  # shape (K, C)
+
+  # Calculate eigenvectors.
+  # Note: We can either use eigenvector decomposition, or SVD.
+  #eigvals, eigvecs = np.linalg.eig(np.cov(OD_thresh.T))  # np.cov results in inf/nans
+  U, s, V = np.linalg.svd(OD_thresh, full_matrices=False)
+
+  # Extract two largest eigenvectors.
+  # Note: We swap the sign of the eigvecs here to be consistent
+  # with other implementations.  Both +/- eigvecs are valid, with
+  # the same eigenvalue, so this is okay.
+  #top_eigvecs = eigvecs[:, np.argsort(eigvals)[-2:]] * -1
+  top_eigvecs = V[0:2, :].T * -1  # shape (C, 2)
+
+  # Project thresholded optical density values onto plane spanned by
+  # 2 largest eigenvectors.
+  proj = np.dot(OD_thresh, top_eigvecs)  # shape (K, 2)
+
+  # Calculate angle of each point wrt the first plane direction.
+  # Note: the parameters are `np.arctan2(y, x)`
+  angles = np.arctan2(proj[:, 1], proj[:, 0])  # shape (K,)
+
+  # Find robust extremes (a and 100-a percentiles) of the angle.
+  min_angle = np.percentile(angles, alpha)
+  max_angle = np.percentile(angles, 100-alpha)
+
+  # Convert min/max vectors (extremes) back to optimal stains in OD space.
+  # This computes a set of axes for each angle onto which we can project
+  # the top eigenvectors.  This assumes that the projected values have
+  # been normalized to unit length.
+  extreme_angles = np.array(
+    [[np.cos(min_angle), np.cos(max_angle)],
+     [np.sin(min_angle), np.sin(max_angle)]]
+  )  # shape (2,2)
+  stains = np.dot(top_eigvecs, extreme_angles)  # shape (C, 2)
+
+  # Merge vectors with hematoxylin first, and eosin second, as a heuristic.
+  if stains[0, 0] < stains[0, 1]:
+    stains[:, [0, 1]] = stains[:, [1, 0]]  # swap columns
+
+  # Calculate saturations of each stain.
+  # Note: Here, we solve
+  #    OD = VS
+  #     S = V^{-1}OD
+  # where `OD` is the matrix of optical density values of our image,
+  # `V` is the matrix of stain vectors, and `S` is the matrix of stain
+  # saturations.  Since this is an overdetermined system, we use the
+  # least squares solver, rather than a direct solve.
+  sats, _, _, _ = np.linalg.lstsq(stains, OD.T)
+
+  # Normalize stain saturations to have same pseudo-maximum based on
+  # a reference max saturation.
+  max_sat = np.percentile(sats, 99, axis=1, keepdims=True)
+  sats = sats / max_sat * max_sat_ref
+
+  # Compute optimal OD values.
+  OD_norm = np.dot(stain_ref, sats)
+
+  # Recreate image.
+  # Note: If the image is immediately converted to uint8 with `.astype(np.uint8)`, it will
+  # not return the correct values due to the initital values being outside of [0,255].
+  # To fix this, we round to the nearest integer, and then clip to [0,255], which is the
+  # same behavior as Matlab.
+  #x_norm = np.exp(OD_norm) * light_intensity  # natural log approach
+  x_norm = 10**(-OD_norm) * light_intensity - 1e-8  # log10 approach
+  x_norm = np.clip(np.round(x_norm), 0, 255).astype(np.uint8)
+  x_norm = x_norm.astype(np.uint8)
+  x_norm = x_norm.T.reshape(h,w,c)
+  return (slide_num, x_norm)
+
+
+def flatten_sample(sample_tuple):
+  """
+  Flatten a (H,W,C) sample into a (C*H*W) row vector.
+
+  Transpose each sample from (H, W, channels) to (channels, H, W), then
+  flatten each into a vector of length channels*H*W.
+
+  Args:
+    sample_tuple: A (slide_num, sample) tuple, where slide_num is an
+      integer, and sample is a 3D NumPy array of shape (H,W,C).
+
+  Returns:
+    A (slide_num, sample) tuple, where the sample has been transposed
+    from (H,W,C) to (C,H,W), and flattened to a vector of length
+    (C*H*W).
+  """
+  slide_num, sample = sample_tuple
+  # 1. Swap axes from (sample_size_x, sample_size_y, ch) to
+  # (ch, sample_size_x, sample_size_y).
+  # 2. Flatten sample into (ch*sample_size_x*sample_size_y).
+  flattened_sample = sample.transpose(2,0,1).reshape(-1)
+  return (slide_num, flattened_sample)
+
+
 # Get Ground Truth Labels
 
 def get_labels_df(folder):
   """
   Create a DataFrame with the ground truth labels for each slide.
-  
+
   Args:
     folder: Directory containing a `training_ground_truth.csv` file
       containing the ground truth "tumor_score" and "molecular_score"
@@ -369,17 +508,18 @@ def get_labels_df(folder):
 # Process All Slides Into A Spark DataFrame
 
 def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024, overlap=0,
-               tissue_threshold=0.9, sample_size=256, grayscale=False, num_partitions=20000):
+               tissue_threshold=0.9, sample_size=256, grayscale=False, normalize_stains=True,
+               num_partitions=20000):
   """
   Preprocess a set of whole-slide images.
-  
+
   Preprocess a set of whole-slide images as follows:
     1. Tile the slides into tiles of size (tile_size, tile_size, 3).
     2. Filter the tiles to remove unnecessary tissue.
     3. Cut the remaining tiles into samples of size
        (sample_size, sample_size, ch), where `ch` is 1 if `grayscale`
        is true, or 3 otherwise.
-  
+
   Args:
     spark: SparkSession.
     slide_nums: List of whole-slide numbers to process.
@@ -399,8 +539,9 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024,
       generated.
     grayscale: Whether or not to generate grayscale samples, rather
       than RGB.
+    normalize_stains: Whether or not to apply stain normalization.
     num_partitions: Number of partitions to use during processing.
-  
+
   Returns:
     A Spark DataFrame in which each row contains the slide number, tumor
     score, molecular score, and the sample stretched out into a Vector.
@@ -418,17 +559,16 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024,
   #row_mb = tile_size * tile_size * channels * 8 / 1024 / 1024  # size of one row in MB
   #rows_per_part = round(part_size / row_mb)
   #num_parts = rows / rows_per_part
-  ## HACK: Force even partitioning by collecting and parallelizing -- for memory issues.
-  ## Note: This was a PySpark bug with a fix in the master branch now.
-  #tile_indices = tile_indices.collect()
-  #tile_indices = sc.parallelize(tile_indices, num_partitions)
-  ## END HACK
   tile_indices = tile_indices.repartition(num_partitions)
   tile_indices.cache()
-  # Extract all tiles into a DataFrame, filter, and cut into smaller samples.
+  # Extract all tiles into a DataFrame, filter, cut into smaller samples, apply stain
+  # normalization, and flatten.
   tiles = tile_indices.map(lambda tile_index: process_tile_index(tile_index, folder, training))
   filtered_tiles = tiles.filter(lambda tile: keep_tile(tile, tile_size, tissue_threshold))
   samples = filtered_tiles.flatMap(lambda tile: process_tile(tile, sample_size, grayscale))
+  if normalize_stains:
+    samples = samples.map(lambda sample: normalize_staining(sample))
+  samples = samples.map(lambda sample: flatten_sample(sample))
   if training:
     # Append labels
     labels_df = get_labels_df(folder)
@@ -441,7 +581,6 @@ def preprocess(spark, slide_nums, folder="data", training=True, tile_size=1024,
   else:  # testing data -- no labels
     df = samples.toDF(["slide_num", "sample"])
     df = df.select(df.slide_num.astype("int"), df["sample"])
-  #df = df.repartition(num_partitions)  # HACK: Even out the partitions to avoid saving issues
   return df
 
 
@@ -451,7 +590,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
                     debug=False):
   """
   Split a DataFrame of slide samples into training and validation sets.
-  
+
   Args:
     spark: SparkSession.
     df: A Spark DataFrame in which each row contains the slide number,
@@ -466,7 +605,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
     add_row_indices: Boolean for whether or not to prepend an index
       column contain the row index for use downstream by SystemML.
       The column name will be "__INDEX".
-    
+
   Returns:
     A Spark DataFrame in which each row contains the slide number, tumor
     score, molecular score, and the sample stretched out into a Vector.
@@ -474,7 +613,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
   # Create DataFrame of labels for the given slide numbers.
   labels_df = get_labels_df(folder)
   labels_df = labels_df.loc[slide_nums]
-    
+
   # Randomly split slides 80%/20% into train and validation sets.
   train_nums_df = labels_df.sample(frac=train_frac, random_state=seed)
   val_nums_df = labels_df.drop(train_nums_df.index)
@@ -487,11 +626,11 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
                    .coalesce(1))
 
   # Note: Explicitly mark the smaller DataFrames as able to be broadcasted
-  # in order to have Catalyst choose the more efficient BroadcastHashJoin, 
+  # in order to have Catalyst choose the more efficient BroadcastHashJoin,
   # rather than the costly SortMergeJoin.
   train = df.join(F.broadcast(train_nums), on="slide_num")
   val = df.join(F.broadcast(val_nums), on="slide_num")
-  
+
   if debug:
     # DEBUG: Sanity checks.
     assert len(pd.merge(train_nums_df, val_nums_df, on="slide_num")) == 0
@@ -506,14 +645,14 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
     print(train.count(), val.count())
     #  - Check physical plans for broadcast join.
     print(train.explain(), val.explain())
-  
+
   # Add row indices for use with SystemML.
   if add_row_indices:
     train = (train.rdd
                   .zipWithIndex()
                   .map(lambda r: (r[1] + 1, *r[0]))  # flatten & convert index to 1-based indexing
                   .toDF(['__INDEX', 'slide_num', 'tumor_score', 'molecular_score', 'sample']))
-    train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"), 
+    train = train.select(train["__INDEX"].astype("int"), train.slide_num.astype("int"),
                          train.tumor_score.astype("int"), train.molecular_score, train["sample"])
 
     val = (val.rdd
@@ -531,7 +670,7 @@ def train_val_split(spark, df, slide_nums, folder, train_frac=0.8, add_row_indic
 def save(df, filepath, sample_size, grayscale, mode="error", format="parquet", file_size=128):
   """
   Save a preprocessed DataFrame with a constraint on the file sizes.
-  
+
   Args:
     df: A Spark DataFrame.
     filepath: Hadoop-supported path at which to save `df`.