You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/07/18 00:20:45 UTC

[2/5] systemml git commit: [SYSTEMML-1185][SYSTEMML-1766] Merge experimental breast cancer updates
diff --git a/projects/breast_cancer/bin/ b/projects/breast_cancer/bin/
new file mode 100755
index 0000000..d92ce87
--- /dev/null
+++ b/projects/breast_cancer/bin/
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+rm -rf metastore_db/
+rm -rf derby.log
+rm -rf spark-warehouse/
+rm -rf scratch_space/
diff --git a/projects/breast_cancer/bin/ b/projects/breast_cancer/bin/
new file mode 100755
index 0000000..b432e3f
--- /dev/null
+++ b/projects/breast_cancer/bin/
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+watch -n 0.5 nvidia-smi
diff --git a/projects/breast_cancer/bin/ b/projects/breast_cancer/bin/
new file mode 100755
index 0000000..2a7e903
--- /dev/null
+++ b/projects/breast_cancer/bin/
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ps -ef | grep `whoami` | grep "[p]ython" | awk '{print $2}' | xargs kill -9
+ps -ef | grep `whoami` | grep "[j]ava" | awk '{print $2}' | xargs kill -9
diff --git a/projects/breast_cancer/bin/ b/projects/breast_cancer/bin/
new file mode 100755
index 0000000..8445858
--- /dev/null
+++ b/projects/breast_cancer/bin/
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+tensorboard --logdir=experiments --reload_interval 5
diff --git a/projects/breast_cancer/breastcancer/convnet.dml b/projects/breast_cancer/breastcancer/convnet.dml
new file mode 100644
index 0000000..6cbea39
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/convnet.dml
@@ -0,0 +1,495 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ * Breast Cancer LeNet-like ConvNet Model
+ */
+# Imports
+source("nn/layers/affine.dml") as affine
+source("nn/layers/conv2d_builtin.dml") as conv2d
+source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
+source("nn/layers/dropout.dml") as dropout
+source("nn/layers/l2_reg.dml") as l2_reg
+source("nn/layers/max_pool2d_builtin.dml") as max_pool2d
+source("nn/layers/relu.dml") as relu
+source("nn/layers/softmax.dml") as softmax
+#source("nn/optim/adam.dml") as adam
+source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
+train = function(matrix[double] X, matrix[double] Y,
+                 matrix[double] X_val, matrix[double] Y_val,
+                 int C, int Hin, int Win,
+                 double lr, double mu, double decay, double lambda,
+                 int batch_size, int epochs, int log_interval,
+                 string checkpoint_dir)
+    return (matrix[double] Wc1, matrix[double] bc1,
+            matrix[double] Wc2, matrix[double] bc2,
+            matrix[double] Wc3, matrix[double] bc3,
+            matrix[double] Wa1, matrix[double] ba1,
+            matrix[double] Wa2, matrix[double] ba2) {
+  /*
+   * Trains a convolutional net using a "LeNet"-like architecture.
+   *
+   * The input matrix, X, has N examples, each represented as a 3D
+   * volume unrolled into a single vector.  The targets, Y, have K
+   * classes, and are one-hot encoded.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, C*Hin*Win).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - X_val: Input validation data matrix, of shape (N, C*Hin*Win).
+   *  - Y_val: Target validation matrix, of shape (N, K).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   *  - lr: Learning rate.
+   *  - mu: Momentum value.
+   *      Typical values are in the range of [0.5, 0.99], usually
+   *      started at the lower end and annealed towards the higher end.
+   *  - decay: Learning rate decay rate.
+   *  - lambda: Regularization strength.
+   *  - batch_size: Size of mini-batches to train on.
+   *  - epochs: Total number of full training loops over the full data set.
+   *  - log_interval: Interval, in iterations, between log outputs.
+   *  - checkpoint_dir: Directory to store model checkpoints.
+   *
+   * Outputs:
+   *  - Wc1: 1st layer weights (parameters) matrix, of shape (F1, C*Hf*Wf).
+   *  - bc1: 1st layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd layer weights (parameters) matrix, of shape (F2*(Hin/4)*(Win/4), N3).
+   *  - bc3: 3rd layer biases vector, of shape (1, N3).
+   *  - Wa2: 4th layer weights (parameters) matrix, of shape (N3, K).
+   *  - ba2: 4th layer biases vector, of shape (1, K).
+   */
+  N = nrow(X)
+  K = ncol(Y)
+  # Create network:
+  # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3
+  #  -> affine1 -> relu1 -> dropout1 -> affine2 -> softmax
+  Hf = 3  # filter height
+  Wf = 3  # filter width
+  stride = 1
+  pad = 1  # For same dimensions, (Hf - stride) / 2
+  F1 = 32  # num conv filters in conv1
+  F2 = 32  # num conv filters in conv2
+  F3 = 32  # num conv filters in conv3
+  N1 = 512  # num nodes in affine1
+  # Note: affine2 has K nodes, which is equal to the number of target dimensions (num classes)
+  [Wc1, bc1] = conv2d::init(F1, C, Hf, Wf)  # inputs: (N, C*Hin*Win)
+  [Wc2, bc2] = conv2d::init(F2, F1, Hf, Wf)  # inputs: (N, F1*(Hin/2)*(Win/2))
+  [Wc3, bc3] = conv2d::init(F3, F2, Hf, Wf)  # inputs: (N, F2*(Hin/2^2)*(Win/2^2))
+  [Wa1, ba1] = affine::init(F3*(Hin/2^3)*(Win/2^3), N1)  # inputs: (N, F3*(Hin/2^3)*(Win/2^3))
+  [Wa2, ba2] = affine::init(N1, K)  # inputs: (N, N1)
+  Wa2 = Wa2 / sqrt(2)  # different initialization, since being fed into softmax, instead of relu
+  # TODO: Compare optimizers once training is faster.
+  # Initialize SGD w/ Nesterov momentum optimizer
+  vWc1 = sgd_nesterov::init(Wc1); vbc1 = sgd_nesterov::init(bc1)
+  vWc2 = sgd_nesterov::init(Wc2); vbc2 = sgd_nesterov::init(bc2)
+  vWc3 = sgd_nesterov::init(Wc3); vbc3 = sgd_nesterov::init(bc3)
+  vWa1 = sgd_nesterov::init(Wa1); vba1 = sgd_nesterov::init(ba1)
+  vWa2 = sgd_nesterov::init(Wa2); vba2 = sgd_nesterov::init(ba2)
+  #[mWc1, vWc1] = adam::init(Wc1)  # optimizer 1st & 2nd moment state for Wc1
+  #[mbc1, vbc1] = adam::init(bc1)  # optimizer 1st & 2nd moment state for bc1
+  #[mWc2, vWc2] = adam::init(Wc2)  # optimizer 1st & 2nd moment state for Wc2
+  #[mbc2, vbc2] = adam::init(bc2)  # optimizer 1st & 2nd moment state for bc2
+  #[mWc3, vWc3] = adam::init(Wc3)  # optimizer 1st & 2nd moment state for Wc3
+  #[mbc3, vbc3] = adam::init(bc3)  # optimizer 1st & 2nd moment state for bc3
+  #[mWa1, vWa1] = adam::init(Wa1)  # optimizer 1st & 2nd moment state for Wa1
+  #[mba1, vba1] = adam::init(ba1)  # optimizer 1st & 2nd moment state for ba1
+  #[mWa2, vWa2] = adam::init(Wa2)  # optimizer 1st & 2nd moment state for Wa2
+  #[mba2, vba2] = adam::init(ba2)  # optimizer 1st & 2nd moment state for ba2
+  #beta1 = 0.9
+  #beta2 = 0.999
+  #eps = 1e-8
+  # TODO: Enable starting val metrics once fast, distributed predictions are available.
+  # Starting validation loss & accuracy
+  #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+  #loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+  #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+  ## Output results
+  #print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
+  # Optimize
+  print("Starting optimization")
+  iters = ceil(N / batch_size)
+  for (e in 1:epochs) {
+    for(i in 1:iters) {
+      # Get next batch
+      beg = ((i-1) * batch_size) %% N + 1
+      end = min(N, beg + batch_size - 1)
+      X_batch = X[beg:end,]
+      y_batch = Y[beg:end,]
+      # Compute forward pass
+      ## conv layer 1: conv1 -> relu1 -> pool1
+      [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf,
+                                                stride, stride, pad, pad)
+      outc1r = relu::forward(outc1)
+      [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+                                                       strideh=2, stridew=2, 0, 0)
+      ## conv layer 2: conv2 -> relu2 -> pool2
+      [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+                                                stride, stride, pad, pad)
+      outc2r = relu::forward(outc2)
+      [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+                                                       strideh=2, stridew=2, 0, 0)
+      ## conv layer 3: conv3 -> relu3 -> pool3
+      [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+                                                stride, stride, pad, pad)
+      outc3r = relu::forward(outc3)
+      [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+                                                       strideh=2, stridew=2, 0, 0)
+      ## affine layer 1:  affine1 -> relu1 -> dropout1
+      outa1 = affine::forward(outc3p, Wa1, ba1)
+      outa1r = relu::forward(outa1)
+      [outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+      ## affine layer 2:  affine2 -> softmax
+      outa2 = affine::forward(outa1d, Wa2, ba2)
+      probs = softmax::forward(outa2)
+      # Compute data backward pass
+      ## loss:
+      dprobs = cross_entropy_loss::backward(probs, y_batch)
+      ## affine layer 2:  affine2 -> softmax
+      douta2 = softmax::backward(dprobs, outa2)
+      [douta1d, dWa2, dba2] = affine::backward(douta2, outa1d, Wa2, ba2)
+      ## layer 3:  affine3 -> relu3 -> dropout
+      ## affine layer 1:  affine1 -> relu1 -> dropout
+      douta1r = dropout::backward(douta1d, outa1r, 0.5, maskad1)
+      douta1 = relu::backward(douta1r, outa1)
+      [doutc3p, dWa1, dba1] = affine::backward(douta1, outc3p, Wa1, ba1)
+      ## conv layer 3: conv3 -> relu3 -> pool3
+      doutc3r = max_pool2d::backward(doutc3p, Houtc3p, Woutc3p, outc3r, F3, Houtc3, Woutc3,
+                                     Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+      doutc3 = relu::backward(doutc3r, outc3)
+      [doutc2p, dWc3, dbc3] = conv2d::backward(doutc3, Houtc3, Woutc3, outc2p, Wc3, bc2, F2,
+                                               Houtc2p, Woutc2p, Hf, Wf, stride, stride, pad, pad)
+      ## conv layer 2: conv2 -> relu2 -> pool2
+      doutc2r = max_pool2d::backward(doutc2p, Houtc2p, Woutc2p, outc2r, F2, Houtc2, Woutc2,
+                                     Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+      doutc2 = relu::backward(doutc2r, outc2)
+      [doutc1p, dWc2, dbc2] = conv2d::backward(doutc2, Houtc2, Woutc2, outc1p, Wc2, bc2, F1,
+                                               Houtc1p, Woutc1p, Hf, Wf, stride, stride, pad, pad)
+      ## conv layer 1: conv1 -> relu1 -> pool1
+      doutc1r = max_pool2d::backward(doutc1p, Houtc1p, Woutc1p, outc1r, F1, Houtc1, Woutc1,
+                                     Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+      doutc1 = relu::backward(doutc1r, outc1)
+      [dX_batch, dWc1, dbc1] = conv2d::backward(doutc1, Houtc1, Woutc1, X_batch, Wc1, bc1, C,
+                                                Hin, Win, Hf, Wf, stride, stride, pad, pad)
+      # Compute regularization backward pass
+      dWc1_reg = l2_reg::backward(Wc1, lambda)
+      dWc2_reg = l2_reg::backward(Wc2, lambda)
+      dWc3_reg = l2_reg::backward(Wc3, lambda)
+      dWa1_reg = l2_reg::backward(Wa1, lambda)
+      dWa2_reg = l2_reg::backward(Wa2, lambda)
+      dWc1 = dWc1 + dWc1_reg
+      dWc2 = dWc2 + dWc2_reg
+      dWc3 = dWc3 + dWc3_reg
+      dWa1 = dWa1 + dWa1_reg
+      dWa2 = dWa2 + dWa2_reg
+      # Optimize with SGD w/ Nesterov momentum
+      [Wc1, vWc1] = sgd_nesterov::update(Wc1, dWc1, lr, mu, vWc1)
+      [bc1, vbc1] = sgd_nesterov::update(bc1, dbc1, lr, mu, vbc1)
+      [Wc2, vWc2] = sgd_nesterov::update(Wc2, dWc2, lr, mu, vWc2)
+      [bc2, vbc2] = sgd_nesterov::update(bc2, dbc2, lr, mu, vbc2)
+      [Wc3, vWc3] = sgd_nesterov::update(Wc3, dWc3, lr, mu, vWc3)
+      [bc3, vbc3] = sgd_nesterov::update(bc3, dbc3, lr, mu, vbc3)
+      [Wa1, vWa1] = sgd_nesterov::update(Wa1, dWa1, lr, mu, vWa1)
+      [ba1, vba1] = sgd_nesterov::update(ba1, dba1, lr, mu, vba1)
+      [Wa2, vWa2] = sgd_nesterov::update(Wa2, dWa2, lr, mu, vWa2)
+      [ba2, vba2] = sgd_nesterov::update(ba2, dba2, lr, mu, vba2)
+      #t = e*i - 1
+      #[Wc1, mWc1, vWc1] = adam::update(Wc1, dWc1, lr, beta1, beta2, eps, t, mWc1, vWc1)
+      #[bc1, mbc1, vbc1] = adam::update(bc1, dbc1, lr, beta1, beta2, eps, t, mbc1, vbc1)
+      #[Wc2, mWc2, vWc2] = adam::update(Wc2, dWc2, lr, beta1, beta2, eps, t, mWc2, vWc2)
+      #[bc2, mbc2, vbc2] = adam::update(bc2, dbc2, lr, beta1, beta2, eps, t, mbc2, vbc2)
+      #[Wc3, mWc3, vWc3] = adam::update(Wc3, dWc3, lr, beta1, beta2, eps, t, mWc3, vWc3)
+      #[bc3, mbc3, vbc3] = adam::update(bc3, dbc3, lr, beta1, beta2, eps, t, mbc3, vbc3)
+      #[Wa1, mWa1, vWa1] = adam::update(Wa1, dWa1, lr, beta1, beta2, eps, t, mWa1, vWa1)
+      #[ba1, mba1, vba1] = adam::update(ba1, dba1, lr, beta1, beta2, eps, t, mba1, vba1)
+      #[Wa2, mWa2, vWa2] = adam::update(Wa2, dWa2, lr, beta1, beta2, eps, t, mWa2, vWa2)
+      #[ba2, mba2, vba2] = adam::update(ba2, dba2, lr, beta1, beta2, eps, t, mba2, vba2)
+      # Compute loss & accuracy for training & validation data every `log_interval` iterations.
+      if (i %% log_interval == 0) {
+        # Compute training loss & accuracy
+        loss_data = cross_entropy_loss::forward(probs, y_batch)
+        loss_reg_Wc1 = l2_reg::forward(Wc1, lambda)
+        loss_reg_Wc2 = l2_reg::forward(Wc2, lambda)
+        loss_reg_Wc3 = l2_reg::forward(Wc3, lambda)
+        loss_reg_Wa1 = l2_reg::forward(Wa1, lambda)
+        loss_reg_Wa2 = l2_reg::forward(Wa2, lambda)
+        loss = loss_data + loss_reg_Wc1 + loss_reg_Wc2 + loss_reg_Wc3 + loss_reg_Wa1 + loss_reg_Wa2
+        accuracy = mean(rowIndexMax(probs) == rowIndexMax(y_batch))
+        # TODO: Consider enabling val metrics here once fast, distributed predictions are available.
+        ## Compute validation loss & accuracy
+        #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+        #loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+        #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+        ## Output results
+        #print("Epoch: " + e + ", Iter: " + i + ", Train Loss: " + loss + ", Train Accuracy: "
+        #      + accuracy + ", Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val
+        #      + ", lr: " + lr + ", mu " + mu)
+        # Output results
+        print("Epoch: " + e + "/" + epochs + ", Iter: " + i + "/" + iters
+              + ", Train Loss: " + loss + ", Train Accuracy: " + accuracy)
+      }
+    }
+    # Compute validation loss & accuracy for validation data every epoch
+    probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+    loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+    accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+    # Output results
+    print("Epoch: " + e + "/" + epochs + ", Val Loss: " + loss_val
+          + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu)
+    # Checkpoint model
+    dir = checkpoint_dir + e + "/"
+    dummy = checkpoint(dir, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+    str = "lr: " + lr + ", mu: " + mu + ", decay: " + decay + ", lambda: " + lambda
+          + ", batch_size: " + batch_size
+    name = dir + accuracy_val
+    write(str, name)
+    # Anneal momentum towards 0.999
+    mu = mu + (0.999 - mu)/(1+epochs-e)
+    # Decay learning rate
+    lr = lr * decay
+  }
+checkpoint = function(string dir,
+                      matrix[double] Wc1, matrix[double] bc1,
+                      matrix[double] Wc2, matrix[double] bc2,
+                      matrix[double] Wc3, matrix[double] bc3,
+                      matrix[double] Wa1, matrix[double] ba1,
+                      matrix[double] Wa2, matrix[double] ba2) {
+  /*
+   * Save the model parameters.
+   *
+   * Inputs:
+   *  - dir: Directory in which to save model parameters.
+   *  - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf).
+   *  - bc1: 1st conv layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd conv layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf).
+   *  - bc3: 3rd conv layer biases vector, of shape (F3, 1).
+   *  - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1).
+   *  - ba1: 1st affine layer biases vector, of shape (1, N1).
+   *  - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K).
+   *  - ba2: 2nd affine layer biases vector, of shape (1, K).
+   *
+   * Outputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   */
+  write(Wc1, dir + "Wc1", format="binary")
+  write(bc1, dir + "bc1", format="binary")
+  write(Wc2, dir + "Wc2", format="binary")
+  write(bc2, dir + "bc2", format="binary")
+  write(Wc3, dir + "Wc3", format="binary")
+  write(bc3, dir + "bc3", format="binary")
+  write(Wa1, dir + "Wa1", format="binary")
+  write(ba1, dir + "ba1", format="binary")
+  write(Wa2, dir + "Wa2", format="binary")
+  write(ba2, dir + "ba2", format="binary")
+predict = function(matrix[double] X, int C, int Hin, int Win,
+                   matrix[double] Wc1, matrix[double] bc1,
+                   matrix[double] Wc2, matrix[double] bc2,
+                   matrix[double] Wc3, matrix[double] bc3,
+                   matrix[double] Wa1, matrix[double] ba1,
+                   matrix[double] Wa2, matrix[double] ba2)
+    return (matrix[double] probs) {
+  /*
+   * Computes the class probability predictions of a convolutional
+   * net using the "LeNet" architecture.
+   *
+   * The input matrix, X, has N examples, each represented as a 3D
+   * volume unrolled into a single vector.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, C*Hin*Win).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   *  - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf).
+   *  - bc1: 1st conv layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd conv layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf).
+   *  - bc3: 3rd conv layer biases vector, of shape (F3, 1).
+   *  - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1).
+   *  - ba1: 1st affine layer biases vector, of shape (1, N1).
+   *  - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K).
+   *  - ba2: 2nd affine layer biases vector, of shape (1, K).
+   *
+   * Outputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   */
+  N = nrow(X)
+  # Network:
+  # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3
+  #  -> affine1 -> relu1 -> affine2 -> softmax
+  Hf = 3  # filter height
+  Wf = 3  # filter width
+  stride = 1
+  pad = 1  # For same dimensions, (Hf - stride) / 2
+  F1 = nrow(Wc1)  # num conv filters in conv1
+  F2 = nrow(Wc2)  # num conv filters in conv2
+  F3 = nrow(Wc3)  # num conv filters in conv3
+  N1 = ncol(Wa1)  # num nodes in affine1
+  K = ncol(Wa2)  # num nodes in affine2, equal to number of target dimensions (num classes)
+  # TODO: Implement fast, distributed conv & max pooling operators so that predictions
+  # can be computed in a full-batch, distributed manner.  Alternatively, improve `parfor`
+  # so that it can be efficiently used for parallel predictions.
+  ## Compute forward pass
+  ### conv layer 1: conv1 -> relu1 -> pool1
+  #[outc1, Houtc1, Woutc1] = conv2d::forward(X, Wc1, bc1, C, Hin, Win, Hf, Wf, stride, stride,
+  #                                          pad, pad)
+  #outc1r = relu::forward(outc1)
+  #[outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### conv layer 2: conv2 -> relu2 -> pool2
+  #[outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+  #                                          stride, stride, pad, pad)
+  #outc2r = relu::forward(outc2)
+  #[outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### conv layer 3: conv3 -> relu3 -> pool3
+  #[outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+  #                                          stride, stride, pad, pad)
+  #outc3r = relu::forward(outc3)
+  #[outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### affine layer 1:  affine1 -> relu1 -> dropout
+  #outa1 = affine::forward(outc3p, Wa1, ba1)
+  #outa1r = relu::forward(outa1)
+  ##[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+  ### affine layer 2:  affine2 -> softmax
+  #outa2 = affine::forward(outa1r, Wa2, ba2)
+  #probs = softmax::forward(outa2)
+  # Compute predictions over mini-batches
+  probs = matrix(0, rows=N, cols=K)
+  batch_size = 50
+  iters = ceil(N / batch_size)
+  for(i in 1:iters) {
+  # TODO: `parfor` should work here, possibly as an alternative to distributed predictions.
+  #parfor(i in 1:iters, check=0, mode=REMOTE_SPARK, resultmerge=REMOTE_SPARK) {
+    # Get next batch
+    beg = ((i-1) * batch_size) %% N + 1
+    end = min(N, beg + batch_size - 1)
+    X_batch = X[beg:end,]
+    # Compute forward pass
+    ## conv layer 1: conv1 -> relu1 -> pool1
+    [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc1r = relu::forward(outc1)
+    [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## conv layer 2: conv2 -> relu2 -> pool2
+    [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc2r = relu::forward(outc2)
+    [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## conv layer 3: conv3 -> relu3 -> pool3
+    [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc3r = relu::forward(outc3)
+    [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## affine layer 1:  affine1 -> relu1 -> dropout
+    outa1 = affine::forward(outc3p, Wa1, ba1)
+    outa1r = relu::forward(outa1)
+    #[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+    ## affine layer 2:  affine2 -> softmax
+    outa2 = affine::forward(outa1r, Wa2, ba2)
+    probs_batch = softmax::forward(outa2)
+    # Store predictions
+    probs[beg:end,] = probs_batch
+  }
+eval = function(matrix[double] probs, matrix[double] Y)
+    return (double loss, double accuracy) {
+  /*
+   * Evaluates a convolutional net using the "LeNet" architecture.
+   *
+   * The probs matrix contains the class probability predictions
+   * of K classes over N examples.  The targets, Y, have K classes,
+   * and are one-hot encoded.
+   *
+   * Inputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   *  - Y: Target matrix, of shape (N,
+   *
+   * Outputs:
+   *  - loss: Scalar loss, of shape (1).
+   *  - accuracy: Scalar accuracy, of shape (1).
+   */
+  # Compute loss & accuracy
+  loss = cross_entropy_loss::forward(probs, Y)
+  correct_pred = rowIndexMax(probs) == rowIndexMax(Y)
+  accuracy = mean(correct_pred)
+generate_dummy_data = function()
+    return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) {
+  /*
+   * Generate a dummy dataset similar to the breast cancer dataset.
+   *
+   * Outputs:
+   *  - X: Input data matrix, of shape (N, D).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   */
+  # Generate dummy input data
+  N = 1024  # num examples
+  C = 3  # num input channels
+  Hin = 256  # input height
+  Win = 256  # input width
+  K = 3  # num target classes
+  X = rand(rows=N, cols=C*Hin*Win, pdf="normal")
+  classes = round(rand(rows=N, cols=1, min=1, max=K, pdf="uniform"))
+  Y = table(seq(1, N), classes)  # one-hot encoding
diff --git a/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml b/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml
new file mode 100644
index 0000000..0c5869e
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/convnet_distrib_sgd.dml
@@ -0,0 +1,592 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ * Breast Cancer LeNet-like ConvNet Model
+ */
+# Imports
+source("nn/layers/affine.dml") as affine
+source("nn/layers/conv2d_builtin.dml") as conv2d
+source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
+source("nn/layers/dropout.dml") as dropout
+source("nn/layers/l2_reg.dml") as l2_reg
+source("nn/layers/max_pool2d_builtin.dml") as max_pool2d
+source("nn/layers/relu.dml") as relu
+source("nn/layers/softmax.dml") as softmax
+#source("nn/optim/adam.dml") as adam
+source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
+train = function(matrix[double] X, matrix[double] Y,
+                 matrix[double] X_val, matrix[double] Y_val,
+                 int C, int Hin, int Win,
+                 double lr, double mu, double decay, double lambda,
+                 int batch_size, int parallel_batches, int epochs,
+                 int log_interval, string checkpoint_dir)
+    return (matrix[double] Wc1, matrix[double] bc1,
+            matrix[double] Wc2, matrix[double] bc2,
+            matrix[double] Wc3, matrix[double] bc3,
+            matrix[double] Wa1, matrix[double] ba1,
+            matrix[double] Wa2, matrix[double] ba2) {
+  /*
+   * Trains a convolutional net using a "LeNet"-like architecture.
+   *
+   * The input matrix, X, has N examples, each represented as a 3D
+   * volume unrolled into a single vector.  The targets, Y, have K
+   * classes, and are one-hot encoded.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, C*Hin*Win).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - X_val: Input validation data matrix, of shape (N, C*Hin*Win).
+   *  - Y_val: Target validation matrix, of shape (N, K).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   *  - lr: Learning rate.
+   *  - mu: Momentum value.
+   *      Typical values are in the range of [0.5, 0.99], usually
+   *      started at the lower end and annealed towards the higher end.
+   *  - decay: Learning rate decay rate.
+   *  - lambda: Regularization strength.
+   *  - batch_size: Size of mini-batches to train on.
+   *  - parallel_batches: Number of parallel batches to run for
+   *      distributed SGD.
+   *  - epochs: Total number of full training loops over the full data
+   *      set.
+   *  - log_interval: Interval, in iterations, between log outputs.
+   *  - checkpoint_dir: Directory to store model checkpoints.
+   *
+   * Outputs:
+   *  - Wc1: 1st layer weights (parameters) matrix, of
+   *      shape (F1, C*Hf*Wf).
+   *  - bc1: 1st layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd layer weights (parameters) matrix, of
+   *      shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd layer weights (parameters) matrix, of
+   *      shape (F2*(Hin/4)*(Win/4), N3).
+   *  - bc3: 3rd layer biases vector, of shape (1, N3).
+   *  - Wa2: 4th layer weights (parameters) matrix, of shape (N3, K).
+   *  - ba2: 4th layer biases vector, of shape (1, K).
+   */
+  N = nrow(X)
+  K = ncol(Y)
+  # Create network:
+  # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3
+  #  -> affine1 -> relu1 -> dropout1 -> affine2 -> softmax
+  Hf = 3  # filter height
+  Wf = 3  # filter width
+  stride = 1
+  pad = 1  # For same dimensions, (Hf - stride) / 2
+  F1 = 32  # num conv filters in conv1
+  F2 = 32  # num conv filters in conv2
+  F3 = 32  # num conv filters in conv3
+  N1 = 256  # num nodes in affine1
+  # Note: affine2 has K nodes, which is equal to the number of target dimensions (num classes)
+  [Wc1, bc1] = conv2d::init(F1, C, Hf, Wf)  # inputs: (N, C*Hin*Win)
+  [Wc2, bc2] = conv2d::init(F2, F1, Hf, Wf)  # inputs: (N, F1*(Hin/2)*(Win/2))
+  [Wc3, bc3] = conv2d::init(F3, F2, Hf, Wf)  # inputs: (N, F2*(Hin/2^2)*(Win/2^2))
+  [Wa1, ba1] = affine::init(F3*(Hin/2^3)*(Win/2^3), N1)  # inputs: (N, F3*(Hin/2^3)*(Win/2^3))
+  [Wa2, ba2] = affine::init(N1, K)  # inputs: (N, N1)
+  Wa2 = Wa2 / sqrt(2)  # different initialization, since being fed into softmax, instead of relu
+  # TODO: Compare optimizers once training is faster.
+  # Initialize SGD w/ Nesterov momentum optimizer
+  vWc1 = sgd_nesterov::init(Wc1); vbc1 = sgd_nesterov::init(bc1)
+  vWc2 = sgd_nesterov::init(Wc2); vbc2 = sgd_nesterov::init(bc2)
+  vWc3 = sgd_nesterov::init(Wc3); vbc3 = sgd_nesterov::init(bc3)
+  vWa1 = sgd_nesterov::init(Wa1); vba1 = sgd_nesterov::init(ba1)
+  vWa2 = sgd_nesterov::init(Wa2); vba2 = sgd_nesterov::init(ba2)
+  #[mWc1, vWc1] = adam::init(Wc1)  # optimizer 1st & 2nd moment state for Wc1
+  #[mbc1, vbc1] = adam::init(bc1)  # optimizer 1st & 2nd moment state for bc1
+  #[mWc2, vWc2] = adam::init(Wc2)  # optimizer 1st & 2nd moment state for Wc2
+  #[mbc2, vbc2] = adam::init(bc2)  # optimizer 1st & 2nd moment state for bc2
+  #[mWc3, vWc3] = adam::init(Wc3)  # optimizer 1st & 2nd moment state for Wc3
+  #[mbc3, vbc3] = adam::init(bc3)  # optimizer 1st & 2nd moment state for bc3
+  #[mWa1, vWa1] = adam::init(Wa1)  # optimizer 1st & 2nd moment state for Wa1
+  #[mba1, vba1] = adam::init(ba1)  # optimizer 1st & 2nd moment state for ba1
+  #[mWa2, vWa2] = adam::init(Wa2)  # optimizer 1st & 2nd moment state for Wa2
+  #[mba2, vba2] = adam::init(ba2)  # optimizer 1st & 2nd moment state for ba2
+  #beta1 = 0.9
+  #beta2 = 0.999
+  #eps = 1e-8
+  # TODO: Enable starting val metrics once fast, distributed predictions are available.
+  # Starting validation loss & accuracy
+  #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+  #loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+  #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+  ## Output results
+  #print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
+  # Optimize
+  print("Starting optimization")
+  group_batch_size = parallel_batches*batch_size
+  groups = ceil(N/group_batch_size)
+  print("Total Epochs: "+epochs+", Batch size: "+batch_size+
+        ", Degree of parallelism: "+parallel_batches+", Group batch size: "+group_batch_size+
+        ", Num groups: "+groups)
+  # Loop over the dataset multiple times
+  for (e in 1:epochs) {
+    # Grab groups of mini-batches
+    for (g in 1:groups) {
+      # Get next group of mini-batches
+      # NOTE: At the end of the dataset, the last mini-batch in this group could be smaller than
+      # the other groups.
+      #group_beg = ((g-1) * group_batch_size) %% N + 1
+      #group_end = min(N, group_beg + group_batch_size - 1)
+      #X_group_batch = X[group_beg:group_end,]
+      #Y_group_batch = Y[group_beg:group_end,]
+      group_beg = 1
+      group_end = group_batch_size
+      X_group_batch = X[group_beg:group_end,]
+      Y_group_batch = Y[group_beg:group_end,]
+      # Create data structure to store gradients computed in parallel
+      dWc1_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc1)*ncol(Wc1))
+      dWc2_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc2)*ncol(Wc2))
+      dWc3_agg = matrix(0, rows=parallel_batches, cols=nrow(Wc3)*ncol(Wc3))
+      dWa1_agg = matrix(0, rows=parallel_batches, cols=nrow(Wa1)*ncol(Wa1))
+      dWa2_agg = matrix(0, rows=parallel_batches, cols=nrow(Wa2)*ncol(Wa2))
+      dbc1_agg = matrix(0, rows=parallel_batches, cols=nrow(bc1)*ncol(bc1))
+      dbc2_agg = matrix(0, rows=parallel_batches, cols=nrow(bc2)*ncol(bc2))
+      dbc3_agg = matrix(0, rows=parallel_batches, cols=nrow(bc3)*ncol(bc3))
+      dba1_agg = matrix(0, rows=parallel_batches, cols=nrow(ba1)*ncol(ba1))
+      dba2_agg = matrix(0, rows=parallel_batches, cols=nrow(ba2)*ncol(ba2))
+      # Run graph on each mini-batch in this group in parallel (ideally on multiple GPUs)
+      # NOTE: The parfor is causing the sizes to not be propagated into the body both here and
+      # in `predict`.  It is not caused by the batch extraction below.  It is the parfor.
+      parfor (j in 1:parallel_batches, mode=REMOTE_SPARK, opt=CONSTRAINED) {
+      #parfor (j in 1:parallel_batches) {
+        # Get a mini-batch in this group
+        beg = ((j-1) * batch_size) %% nrow(X_group_batch) + 1
+        end = min(nrow(X_group_batch), beg + batch_size - 1)
+        X_batch = X_group_batch[beg:end,]
+        Y_batch = Y_group_batch[beg:end,]
+        #beg = 1
+        #end = batch_size
+        #X_batch = X_group_batch[beg:end,]
+        #Y_batch = Y_group_batch[beg:end,]
+        # Compute forward pass
+        ## conv layer 1: conv1 -> relu1 -> pool1
+        [outc1, Houtc1, Woutc1] = conv2d::forward(X_batch, Wc1, bc1, C, Hin, Win, Hf, Wf,
+                                                  stride, stride, pad, pad)
+        outc1r = relu::forward(outc1)
+        [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+                                                         strideh=2, stridew=2, 0, 0)
+        ## conv layer 2: conv2 -> relu2 -> pool2
+        [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+                                                  stride, stride, pad, pad)
+        outc2r = relu::forward(outc2)
+        [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+                                                         strideh=2, stridew=2, 0, 0)
+        ## conv layer 3: conv3 -> relu3 -> pool3
+        [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+                                                  stride, stride, pad, pad)
+        outc3r = relu::forward(outc3)
+        [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+                                                         strideh=2, stridew=2, 0, 0)
+        ## affine layer 1:  affine1 -> relu1 -> dropout1
+        outa1 = affine::forward(outc3p, Wa1, ba1)
+        outa1r = relu::forward(outa1)
+        [outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+        ## affine layer 2:  affine2 -> softmax
+        outa2 = affine::forward(outa1d, Wa2, ba2)
+        probs = softmax::forward(outa2)
+        # Compute data backward pass
+        ## loss:
+        dprobs = cross_entropy_loss::backward(probs, Y_batch)
+        ## affine layer 2:  affine2 -> softmax
+        douta2 = softmax::backward(dprobs, outa2)
+        [douta1d, dWa2, dba2] = affine::backward(douta2, outa1d, Wa2, ba2)
+        ## layer 3:  affine3 -> relu3 -> dropout
+        ## affine layer 1:  affine1 -> relu1 -> dropout
+        douta1r = dropout::backward(douta1d, outa1r, 0.5, maskad1)
+        douta1 = relu::backward(douta1r, outa1)
+        [doutc3p, dWa1, dba1] = affine::backward(douta1, outc3p, Wa1, ba1)
+        ## conv layer 3: conv3 -> relu3 -> pool3
+        doutc3r = max_pool2d::backward(doutc3p, Houtc3p, Woutc3p, outc3r, F3, Houtc3, Woutc3,
+                                       Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+        doutc3 = relu::backward(doutc3r, outc3)
+        [doutc2p, dWc3, dbc3] = conv2d::backward(doutc3, Houtc3, Woutc3, outc2p, Wc3, bc2, F2,
+                                                 Houtc2p, Woutc2p, Hf, Wf, stride, stride, pad, pad)
+        ## conv layer 2: conv2 -> relu2 -> pool2
+        doutc2r = max_pool2d::backward(doutc2p, Houtc2p, Woutc2p, outc2r, F2, Houtc2, Woutc2,
+                                       Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+        doutc2 = relu::backward(doutc2r, outc2)
+        [doutc1p, dWc2, dbc2] = conv2d::backward(doutc2, Houtc2, Woutc2, outc1p, Wc2, bc2, F1,
+                                                 Houtc1p, Woutc1p, Hf, Wf, stride, stride, pad, pad)
+        ## conv layer 1: conv1 -> relu1 -> pool1
+        doutc1r = max_pool2d::backward(doutc1p, Houtc1p, Woutc1p, outc1r, F1, Houtc1, Woutc1,
+                                       Hf=2, Wf=2, strideh=2, stridew=2, 0, 0)
+        doutc1 = relu::backward(doutc1r, outc1)
+        [dX_batch, dWc1, dbc1] = conv2d::backward(doutc1, Houtc1, Woutc1, X_batch, Wc1, bc1, C,
+                                                  Hin, Win, Hf, Wf, stride, stride, pad, pad)
+        # Compute regularization backward pass on weights
+        dWc1_reg = l2_reg::backward(Wc1, lambda)
+        dWc2_reg = l2_reg::backward(Wc2, lambda)
+        dWc3_reg = l2_reg::backward(Wc3, lambda)
+        dWa1_reg = l2_reg::backward(Wa1, lambda)
+        dWa2_reg = l2_reg::backward(Wa2, lambda)
+        dWc1 = dWc1 + dWc1_reg
+        dWc2 = dWc2 + dWc2_reg
+        dWc3 = dWc3 + dWc3_reg
+        dWa1 = dWa1 + dWa1_reg
+        dWa2 = dWa2 + dWa2_reg
+        # Flatten and store gradients for this parallel execution
+        # Note: We multiply by a weighting to allow for proper gradient averaging during the
+        # aggregation even with uneven batch sizes.
+        weighting = nrow(X_batch) / nrow(X_group_batch)
+        dWc1_agg[j,] = matrix(dWc1, rows=1, cols=nrow(Wc1)*ncol(Wc1)) * weighting
+        dWc2_agg[j,] = matrix(dWc2, rows=1, cols=nrow(Wc2)*ncol(Wc2)) * weighting
+        dWc3_agg[j,] = matrix(dWc3, rows=1, cols=nrow(Wc3)*ncol(Wc3)) * weighting
+        dWa1_agg[j,] = matrix(dWa1, rows=1, cols=nrow(Wa1)*ncol(Wa1)) * weighting
+        dWa2_agg[j,] = matrix(dWa2, rows=1, cols=nrow(Wa2)*ncol(Wa2)) * weighting
+        dbc1_agg[j,] = matrix(dbc1, rows=1, cols=nrow(bc1)*ncol(bc1)) * weighting
+        dbc2_agg[j,] = matrix(dbc2, rows=1, cols=nrow(bc2)*ncol(bc2)) * weighting
+        dbc3_agg[j,] = matrix(dbc3, rows=1, cols=nrow(bc3)*ncol(bc3)) * weighting
+        dba1_agg[j,] = matrix(dba1, rows=1, cols=nrow(ba1)*ncol(ba1)) * weighting
+        dba2_agg[j,] = matrix(dba2, rows=1, cols=nrow(ba2)*ncol(ba2)) * weighting
+      }
+      # Aggregate gradients
+      # Note: The gradients are already pre-multiplied by a weight so that addition here
+      # results in gradient averaging even with different possible mini-batch sizes. I.e.,
+      # the final mini-batch at the end of the dataset could be smaller than the other mini-batches.
+      dWc1 = matrix(colSums(dWc1_agg), rows=nrow(Wc1), cols=ncol(Wc1))
+      dWc2 = matrix(colSums(dWc2_agg), rows=nrow(Wc2), cols=ncol(Wc2))
+      dWc3 = matrix(colSums(dWc3_agg), rows=nrow(Wc3), cols=ncol(Wc3))
+      dWa1 = matrix(colSums(dWa1_agg), rows=nrow(Wa1), cols=ncol(Wa1))
+      dWa2 = matrix(colSums(dWa2_agg), rows=nrow(Wa2), cols=ncol(Wa2))
+      dbc1 = matrix(colSums(dbc1_agg), rows=nrow(bc1), cols=ncol(bc1))
+      dbc2 = matrix(colSums(dbc2_agg), rows=nrow(bc2), cols=ncol(bc2))
+      dbc3 = matrix(colSums(dbc3_agg), rows=nrow(bc3), cols=ncol(bc3))
+      dba1 = matrix(colSums(dba1_agg), rows=nrow(ba1), cols=ncol(ba1))
+      dba2 = matrix(colSums(dba2_agg), rows=nrow(ba2), cols=ncol(ba2))
+      # Optimize with SGD w/ Nesterov momentum
+      [Wc1, vWc1] = sgd_nesterov::update(Wc1, dWc1, lr, mu, vWc1)
+      [Wc2, vWc2] = sgd_nesterov::update(Wc2, dWc2, lr, mu, vWc2)
+      [Wc3, vWc3] = sgd_nesterov::update(Wc3, dWc3, lr, mu, vWc3)
+      [Wa1, vWa1] = sgd_nesterov::update(Wa1, dWa1, lr, mu, vWa1)
+      [Wa2, vWa2] = sgd_nesterov::update(Wa2, dWa2, lr, mu, vWa2)
+      [bc1, vbc1] = sgd_nesterov::update(bc1, dbc1, lr, mu, vbc1)
+      [bc2, vbc2] = sgd_nesterov::update(bc2, dbc2, lr, mu, vbc2)
+      [bc3, vbc3] = sgd_nesterov::update(bc3, dbc3, lr, mu, vbc3)
+      [ba1, vba1] = sgd_nesterov::update(ba1, dba1, lr, mu, vba1)
+      [ba2, vba2] = sgd_nesterov::update(ba2, dba2, lr, mu, vba2)
+      #t = e*i - 1
+      #[Wc1, mWc1, vWc1] = adam::update(Wc1, dWc1, lr, beta1, beta2, eps, t, mWc1, vWc1)
+      #[bc1, mbc1, vbc1] = adam::update(bc1, dbc1, lr, beta1, beta2, eps, t, mbc1, vbc1)
+      #[Wc2, mWc2, vWc2] = adam::update(Wc2, dWc2, lr, beta1, beta2, eps, t, mWc2, vWc2)
+      #[bc2, mbc2, vbc2] = adam::update(bc2, dbc2, lr, beta1, beta2, eps, t, mbc2, vbc2)
+      #[Wc3, mWc3, vWc3] = adam::update(Wc3, dWc3, lr, beta1, beta2, eps, t, mWc3, vWc3)
+      #[bc3, mbc3, vbc3] = adam::update(bc3, dbc3, lr, beta1, beta2, eps, t, mbc3, vbc3)
+      #[Wa1, mWa1, vWa1] = adam::update(Wa1, dWa1, lr, beta1, beta2, eps, t, mWa1, vWa1)
+      #[ba1, mba1, vba1] = adam::update(ba1, dba1, lr, beta1, beta2, eps, t, mba1, vba1)
+      #[Wa2, mWa2, vWa2] = adam::update(Wa2, dWa2, lr, beta1, beta2, eps, t, mWa2, vWa2)
+      #[ba2, mba2, vba2] = adam::update(ba2, dba2, lr, beta1, beta2, eps, t, mba2, vba2)
+      # Compute loss & accuracy for training data every `log_interval` iterations.
+      if (g %% log_interval == 0) {
+        print("Logging training loss & accuracy for group " + g + ":")
+        # Get a mini-batch in this group
+        #j = 0
+        #beg = ((j-1) * batch_size) %% nrow(X_group_batch) + 1
+        #end = min(nrow(X_group_batch), beg + batch_size - 1)
+        #X_batch = X_group_batch[beg:end,]
+        #Y_batch = Y_group_batch[beg:end,]
+        # Compute training loss & accuracy using final
+        #probs = predict(X_batch, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+        #loss_data = cross_entropy_loss::forward(probs, Y_batch)
+        probs = predict(X_group_batch, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1,
+                        Wa2, ba2, batch_size)
+        loss_data = cross_entropy_loss::forward(probs, Y_group_batch)
+        loss_reg_Wc1 = l2_reg::forward(Wc1, lambda)
+        loss_reg_Wc2 = l2_reg::forward(Wc2, lambda)
+        loss_reg_Wc3 = l2_reg::forward(Wc3, lambda)
+        loss_reg_Wa1 = l2_reg::forward(Wa1, lambda)
+        loss_reg_Wa2 = l2_reg::forward(Wa2, lambda)
+        loss = loss_data + loss_reg_Wc1 + loss_reg_Wc2 + loss_reg_Wc3 + loss_reg_Wa1 + loss_reg_Wa2
+        #accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_batch))
+        accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_group_batch))
+        # TODO: Consider enabling val metrics here once fast, distributed predictions are available.
+        ## Compute validation loss & accuracy
+        #probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+        #loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+        #accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+        # Output results
+        print("Epoch: " + e + ", Group: " + g + ", Train Loss: " + loss + ", Train Accuracy: "
+              + accuracy) # + ", Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
+      }
+    }
+    # Compute validation loss & accuracy for validation data every epoch
+    print("Logging validation loss & accuracy.")
+    probs_val = predict(X_val, C, Hin, Win, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2,
+                        batch_size)
+    loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+    accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+    # Output results
+    print("Epoch: " + e + "/" + epochs + ", Val Loss: " + loss_val
+          + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu)
+    # Checkpoint model
+    dir = checkpoint_dir + e + "/"
+    dummy = checkpoint(dir, Wc1, bc1, Wc2, bc2, Wc3, bc3, Wa1, ba1, Wa2, ba2)
+    str = "lr: " + lr + ", mu: " + mu + ", decay: " + decay + ", lambda: " + lambda
+          + ", batch_size: " + batch_size
+    name = dir + accuracy_val
+    write(str, name)
+    # Anneal momentum towards 0.999
+    mu = mu + (0.999 - mu)/(1+epochs-e)
+    # Decay learning rate
+    lr = lr * decay
+  }
+checkpoint = function(string dir,
+                      matrix[double] Wc1, matrix[double] bc1,
+                      matrix[double] Wc2, matrix[double] bc2,
+                      matrix[double] Wc3, matrix[double] bc3,
+                      matrix[double] Wa1, matrix[double] ba1,
+                      matrix[double] Wa2, matrix[double] ba2) {
+  /*
+   * Save the model parameters.
+   *
+   * Inputs:
+   *  - dir: Directory in which to save model parameters.
+   *  - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf).
+   *  - bc1: 1st conv layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd conv layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf).
+   *  - bc3: 3rd conv layer biases vector, of shape (F3, 1).
+   *  - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1).
+   *  - ba1: 1st affine layer biases vector, of shape (1, N1).
+   *  - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K).
+   *  - ba2: 2nd affine layer biases vector, of shape (1, K).
+   *
+   * Outputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   */
+  write(Wc1, dir + "Wc1", format="binary")
+  write(bc1, dir + "bc1", format="binary")
+  write(Wc2, dir + "Wc2", format="binary")
+  write(bc2, dir + "bc2", format="binary")
+  write(Wc3, dir + "Wc3", format="binary")
+  write(bc3, dir + "bc3", format="binary")
+  write(Wa1, dir + "Wa1", format="binary")
+  write(ba1, dir + "ba1", format="binary")
+  write(Wa2, dir + "Wa2", format="binary")
+  write(ba2, dir + "ba2", format="binary")
+predict = function(matrix[double] X, int C, int Hin, int Win,
+                   matrix[double] Wc1, matrix[double] bc1,
+                   matrix[double] Wc2, matrix[double] bc2,
+                   matrix[double] Wc3, matrix[double] bc3,
+                   matrix[double] Wa1, matrix[double] ba1,
+                   matrix[double] Wa2, matrix[double] ba2,
+                   int batch_size)
+    return (matrix[double] probs) {
+  /*
+   * Computes the class probability predictions of a convolutional
+   * net using the "LeNet" architecture.
+   *
+   * The input matrix, X, has N examples, each represented as a 3D
+   * volume unrolled into a single vector.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, C*Hin*Win).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   *  - Wc1: 1st conv layer weights (parameters) matrix, of shape (F1, C*Hf*Wf).
+   *  - bc1: 1st conv layer biases vector, of shape (F1, 1).
+   *  - Wc2: 2nd conv layer weights (parameters) matrix, of shape (F2, F1*Hf*Wf).
+   *  - bc2: 2nd conv layer biases vector, of shape (F2, 1).
+   *  - Wc3: 3rd conv layer weights (parameters) matrix, of shape (F3, F2*Hf*Wf).
+   *  - bc3: 3rd conv layer biases vector, of shape (F3, 1).
+   *  - Wa1: 1st affine layer weights (parameters) matrix, of shape (F3*(Hin/2^3)*(Win/2^1), N1).
+   *  - ba1: 1st affine layer biases vector, of shape (1, N1).
+   *  - Wa2: 2nd affine layer weights (parameters) matrix, of shape (N1, K).
+   *  - ba2: 2nd affine layer biases vector, of shape (1, K).
+   *  - batch_size: Size of mini-batches to train on.
+   *
+   * Outputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   */
+  N = nrow(X)
+  # Network:
+  # conv1 -> relu1 -> pool1 -> conv2 -> relu2 -> pool2 -> conv3 -> relu3 -> pool3
+  #  -> affine1 -> relu1 -> affine2 -> softmax
+  Hf = 3  # filter height
+  Wf = 3  # filter width
+  stride = 1
+  pad = 1  # For same dimensions, (Hf - stride) / 2
+  F1 = nrow(Wc1)  # num conv filters in conv1
+  F2 = nrow(Wc2)  # num conv filters in conv2
+  F3 = nrow(Wc3)  # num conv filters in conv3
+  N1 = ncol(Wa1)  # num nodes in affine1
+  K = ncol(Wa2)  # num nodes in affine2, equal to number of target dimensions (num classes)
+  # TODO: Implement fast, distributed conv & max pooling operators so that predictions
+  # can be computed in a full-batch, distributed manner.  Alternatively, improve `parfor`
+  # so that it can be efficiently used for parallel predictions.
+  ## Compute forward pass
+  ### conv layer 1: conv1 -> relu1 -> pool1
+  #[outc1, Houtc1, Woutc1] = conv2d::forward(X, Wc1, bc1, C, Hin, Win, Hf, Wf, stride, stride,
+  #                                          pad, pad)
+  #outc1r = relu::forward(outc1)
+  #[outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### conv layer 2: conv2 -> relu2 -> pool2
+  #[outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+  #                                          stride, stride, pad, pad)
+  #outc2r = relu::forward(outc2)
+  #[outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### conv layer 3: conv3 -> relu3 -> pool3
+  #[outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+  #                                          stride, stride, pad, pad)
+  #outc3r = relu::forward(outc3)
+  #[outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+  #                                                 strideh=2, stridew=2, 0, 0)
+  ### affine layer 1:  affine1 -> relu1 -> dropout
+  #outa1 = affine::forward(outc3p, Wa1, ba1)
+  #outa1r = relu::forward(outa1)
+  ##[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+  ### affine layer 2:  affine2 -> softmax
+  #outa2 = affine::forward(outa1r, Wa2, ba2)
+  #probs = softmax::forward(outa2)
+  # Compute predictions over mini-batches
+  probs = matrix(0, rows=N, cols=K)
+  #batch_size = 32
+  #iters = ceil(N / batch_size)
+  # TODO: `parfor` should work here, possibly as an alternative to distributed predictions.
+  #for (i in 1:iters) {
+  #parfor (i in 1:iters, check=0, mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  #parfor (i in 1:iters, check=0) {  # complains about `probs` as an inter-loop dependency
+  parfor (i in 1:N, check=0, mode=REMOTE_SPARK, opt=CONSTRAINED) {
+  #parfor (i in 1:N) {
+    ## Get next batch
+    #beg = ((i-1) * batch_size) %% N + 1
+    #end = min(N, beg + batch_size - 1)
+    #X_batch = X[beg:end,]
+    #X_batch = X[i,]
+    # Compute forward pass
+    ## conv layer 1: conv1 -> relu1 -> pool1
+    [outc1, Houtc1, Woutc1] = conv2d::forward(X[i,], Wc1, bc1, C, Hin, Win, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc1r = relu::forward(outc1)
+    [outc1p, Houtc1p, Woutc1p] = max_pool2d::forward(outc1r, F1, Houtc1, Woutc1, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## conv layer 2: conv2 -> relu2 -> pool2
+    [outc2, Houtc2, Woutc2] = conv2d::forward(outc1p, Wc2, bc2, F1, Houtc1p, Woutc1p, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc2r = relu::forward(outc2)
+    [outc2p, Houtc2p, Woutc2p] = max_pool2d::forward(outc2r, F2, Houtc2, Woutc2, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## conv layer 3: conv3 -> relu3 -> pool3
+    [outc3, Houtc3, Woutc3] = conv2d::forward(outc2p, Wc3, bc3, F2, Houtc2p, Woutc2p, Hf, Wf,
+                                              stride, stride, pad, pad)
+    outc3r = relu::forward(outc3)
+    [outc3p, Houtc3p, Woutc3p] = max_pool2d::forward(outc3r, F3, Houtc3, Woutc3, Hf=2, Wf=2,
+                                                     strideh=2, stridew=2, 0, 0)
+    ## affine layer 1:  affine1 -> relu1 -> dropout
+    outa1 = affine::forward(outc3p, Wa1, ba1)
+    outa1r = relu::forward(outa1)
+    #[outa1d, maskad1] = dropout::forward(outa1r, 0.5, -1)
+    ## affine layer 2:  affine2 -> softmax
+    outa2 = affine::forward(outa1r, Wa2, ba2)
+    probs_batch = softmax::forward(outa2)
+    # Store predictions
+    #probs[beg:end,] = probs_batch
+    probs[i,] = probs_batch
+  }
+eval = function(matrix[double] probs, matrix[double] Y)
+    return (double loss, double accuracy) {
+  /*
+   * Evaluates a convolutional net using the "LeNet" architecture.
+   *
+   * The probs matrix contains the class probability predictions
+   * of K classes over N examples.  The targets, Y, have K classes,
+   * and are one-hot encoded.
+   *
+   * Inputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   *  - Y: Target matrix, of shape (N,
+   *
+   * Outputs:
+   *  - loss: Scalar loss, of shape (1).
+   *  - accuracy: Scalar accuracy, of shape (1).
+   */
+  # Compute loss & accuracy
+  loss = cross_entropy_loss::forward(probs, Y)
+  correct_pred = rowIndexMax(probs) == rowIndexMax(Y)
+  accuracy = mean(correct_pred)
+generate_dummy_data = function(int N)
+    return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) {
+  /*
+   * Generate a dummy dataset similar to the breast cancer dataset.
+   *
+   * Inputs:
+   *  - N: Number of examples to generate.
+   *
+   * Outputs:
+   *  - X: Input data matrix, of shape (N, D).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   */
+  # Generate dummy input data
+  #N = 1024  # num examples
+  C = 3  # num input channels
+  Hin = 256  # input height
+  Win = 256  # input width
+  K = 3  # num target classes
+  X = rand(rows=N, cols=C*Hin*Win, pdf="normal")
+  classes = round(rand(rows=N, cols=1, min=1, max=K, pdf="uniform"))
+  Y = table(seq(1, N), classes, N, K)  # one-hot encoding
diff --git a/projects/breast_cancer/breastcancer/ b/projects/breast_cancer/breastcancer/
new file mode 100644
index 0000000..6e65a3e
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/
@@ -0,0 +1,229 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+Data utilities for the TUPAC16 breast cancer project.
+This is early, experimental code.
+TODO: Cleanup & add proper comments to all functions.
+import multiprocessing as mp
+import os
+import threading
+import numpy as np
+import py4j
+# Utils for reading data
+def compute_channel_means(rdd, channels, size):
+  """Compute the means of each color channel across the dataset."""
+  # TODO: Replace this with
+  # to cut vector into separate channel vectors, then grab the mean
+  # of those new columns, all using DataFrame functions, rather than
+  # RDD functions.
+  # from import VectorUDT
+  # from pyspark.sql.functions import udf
+  # from pyspark.sql import functions as F
+  # as_ml = udf(lambda v: v.asML() if v is not None else None, VectorUDT())
+  # slicers[0].transform(train_df.withColumn("sample", as_ml("sample"))).select(F.avg("ch0"))
+  # slicers = [VectorSlicer(inputCol="sample", outputCol="ch{}".format(c), indices=range(c*pixels, c*pixels + pixels)) for c in range(CHANNELS)]
+  def helper(x):
+    x = x.sample.values
+    x = np.array(x)
+    x = (x.reshape((-1,channels,size,size))  # shape (N,C,H,W)
+              .transpose((0,2,3,1))  # shape (N,H,W,C)
+              .astype(np.float32))
+    mu = np.mean(x, axis=(0,1,2))
+    return mu
+  means =
+  means = np.array(means)
+  means = np.mean(means, axis=0)
+  return means
+def gen_class_weights(df):
+  """Generate class weights to even out the class distribution during training."""
+  class_counts_df ="tumor_score").groupBy("tumor_score").count()
+  class_counts = {row["tumor_score"]:row["count"] for row in class_counts_df.collect()}
+  max_count = max(class_counts.values())
+  class_weights = {k-1:max_count/v for k,v in class_counts.items()}
+  return class_weights
+def read_data(spark_session, filename_template, sample_size, channels, sample_prob, normalize_class_distribution, seed):
+  """Read and return training & validation Spark DataFrames."""
+  # TODO: Clean this function up!!!
+  assert channels in (1, 3)
+  grayscale = False if channels == 3 else True
+  # Sample (Optional)
+  if sample_prob < 1:
+    try:
+      # Ex: `train_0.01_sample_256.parquet`
+      sampled_filename_template = filename_template.format("{}_sample_".format(sample_prob), sample_size, "_grayscale" if grayscale else "")
+      filename = os.path.join("data", sampled_filename_template)
+      df =
+    except:  # Pre-sampled DataFrame not available
+      filename = os.path.join("data", filename_template.format("", sample_size, "_grayscale" if grayscale else ""))
+      df =
+      p = sample_prob  # sample percentage
+      if normalize_class_distribution:
+        # stratified sample with even class proportions
+        n = df.count()  # num examples
+        K = 3  # num classes
+        s = p * n  # num examples in p% sample, as a fraction
+        s_k = s / K  # num examples per class in evenly-distributed p% sample, as fraction
+        class_counts_df ="tumor_score").groupBy("tumor_score").count()
+        class_counts = {row["tumor_score"]:row["count"] for row in class_counts_df.collect()}
+        ps = {k:s_k/v for k,v in class_counts.items()}
+        df = df.sampleBy("tumor_score", fractions=ps, seed=seed)
+      else:
+        # stratified sample maintaining the original class proportions
+        df = df.sampleBy("tumor_score", fractions={1: p, 2: p, 3: p}, seed=seed)
+      # TODO: Determine if coalesce actually provides a perf benefit on Spark 2.x
+      #train_df.cache(), val_df.cache()  # cache here, or coalesce will hang
+      # tc = train_df.count()
+      # vc = val_df.count()
+      #
+      # # Reduce num partitions to ideal size (~128 MB/partition, determined empirically)
+      # current_tr_parts = train_df.rdd.getNumPartitions()
+      # current_val_parts = train_df.rdd.getNumPartitions()
+      # ex_mb = sample_size * sample_size * channels * 8 / 1024 / 1024  # size of one example in MB
+      # ideal_part_size_mb = 128  # 128 MB partitions sizes are empirically ideal
+      # ideal_exs_per_part = round(ideal_part_size_mb / ex_mb)
+      # tr_parts = round(tc / ideal_exs_per_part)
+      # val_parts = round(vc / ideal_exs_per_part)
+      # if current_tr_parts > tr_parts:
+      #   train_df = train_df.coalesce(tr_parts)
+      # if current_val_parts > val_parts:
+      #   val_df = val_df.coalesce(val_parts)
+      # train_df.cache(), val_df.cache()
+  else:
+    # Read in data
+    filename = os.path.join("data", filename_template.format("", sample_size, "_grayscale" if grayscale else ""))
+    df =
+  return df
+def read_train_data(spark_session, sample_size, channels, sample_prob=1, normalize_class_distribution=False, seed=42):
+  """Read training Spark DataFrame."""
+  filename = "train_{}{}{}_updated.parquet"
+  train_df = read_data(spark_session, filename, sample_size, channels, sample_prob, normalize_class_distribution, seed)
+  return train_df
+def read_val_data(spark_session, sample_size, channels, sample_prob=1, normalize_class_distribution=False, seed=42):
+  """Read validation Spark DataFrame."""
+  filename = "val_{}{}{}_updated.parquet"
+  train_df = read_data(spark_session, filename, sample_size, channels, sample_prob, normalize_class_distribution, seed)
+  return train_df
+# Utils for creating asynchronous queuing batch generators
+# TODO: Add comments to these functions
+def fill_partition_num_queue(partition_num_queue, num_partitions, stop_event):
+  partition_num_queue.cancel_join_thread()
+  while not stop_event.is_set():
+    for i in range(num_partitions):
+      partition_num_queue.put(i)
+def fill_partition_queue(partition_queue, partition_num_queue, rdd, stop_event):
+  partition_queue.cancel_join_thread()
+  while not stop_event.is_set():
+    # py4j has some issues with imports with first starting.
+    try:
+      partition_num = partition_num_queue.get()
+      partition = rdd.context.runJob(rdd, lambda x: x, [partition_num])
+      partition_queue.put(partition)
+    except (AttributeError, py4j.protocol.Py4JError, Exception) as err:
+      print("error: {}".format(err))
+def fill_row_queue(row_queue, partition_queue, stop_event):
+  row_queue.cancel_join_thread()
+  while not stop_event.is_set():
+    rows = partition_queue.get()
+    for row in rows:
+      row_queue.put(row)
+def gen_batch(row_queue, batch_size):
+  while True:
+    features = []
+    labels = []
+    for i in range(batch_size):
+      row = row_queue.get()
+      features.append(row.sample.values)
+      labels.append(row.tumor_score)
+    x_batch = np.array(features).astype(np.uint8)
+    y_batch = np.array(labels).astype(np.uint8)
+    yield x_batch, y_batch
+def create_batch_generator(
+    rdd, batch_size=32, num_partition_threads=32, num_row_processes=16,
+    partition_num_queue_size=128, partition_queue_size=16, row_queue_size=2048):
+  """
+  Create a  multiprocess batch generator.
+  This creates a generator that uses processes and threads to create a
+  pipeline that asynchronously fetches data from Spark, filling a set
+  of queues, while yielding batches.  The goal here is to amortize the
+  time needed to fetch data from Spark so that downstream consumers
+  are saturated.
+  """
+  #rdd.cache()
+  partition_num_queue = mp.Queue(partition_num_queue_size)
+  partition_queue = mp.Queue(partition_queue_size)
+  row_queue = mp.Queue(row_queue_size)
+  num_partitions = rdd.getNumPartitions()
+  stop_event = mp.Event()
+  partition_num_process = mp.Process(target=fill_partition_num_queue, args=(partition_num_queue, num_partitions, stop_event), daemon=True)
+  partition_threads = [threading.Thread(target=fill_partition_queue, args=(partition_queue, partition_num_queue, rdd, stop_event), daemon=True) for _ in range(num_partition_threads)]
+  row_processes = [mp.Process(target=fill_row_queue, args=(row_queue, partition_queue, stop_event), daemon=True) for _ in range(num_row_processes)]
+  ps = [partition_num_process] + row_processes + partition_threads
+  queues = [partition_num_queue, partition_queue, row_queue]
+  for p in ps:
+    p.start()
+  generator = gen_batch(row_queue, batch_size)
+  return generator, ps, queues, stop_event
+def stop(processes, stop_event):
+  """Stop queuing processes."""
+  stop_event.set()
+  for p in processes:
+    if isinstance(p, mp.Process):
+      p.terminate()
+  mp.active_children()  # Use to join the killed processes above.
diff --git a/projects/breast_cancer/breastcancer/softmax_clf.dml b/projects/breast_cancer/breastcancer/softmax_clf.dml
new file mode 100644
index 0000000..35fd545
--- /dev/null
+++ b/projects/breast_cancer/breastcancer/softmax_clf.dml
@@ -0,0 +1,207 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+ * Breast Cancer Softmax Model
+ */
+# Imports
+source("nn/layers/affine.dml") as affine
+source("nn/layers/cross_entropy_loss.dml") as cross_entropy_loss
+source("nn/layers/softmax.dml") as softmax
+#source("nn/optim/adam.dml") as adam
+source("nn/optim/sgd_nesterov.dml") as sgd_nesterov
+train = function(matrix[double] X, matrix[double] Y,
+                 matrix[double] X_val, matrix[double] Y_val,
+                 double lr, double mu, double decay,
+                 int batch_size, int epochs, int log_interval)
+    return (matrix[double] W, matrix[double] b) {
+  /*
+   * Trains a softmax classifier.
+   *
+   * The input matrix, X, has N examples, each with D features.
+   * The targets, Y, have K classes, and are one-hot encoded.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, D).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - X_val: Input validation data matrix, of shape (N, C*Hin*Win).
+   *  - Y_val: Target validation matrix, of shape (N, K).
+   *  - lr: Learning rate.
+   *  - mu: Momentum value.
+   *      Typical values are in the range of [0.5, 0.99], usually
+   *      started at the lower end and annealed towards the higher end.
+   *  - decay: Learning rate decay rate.
+   *  - batch_size: Size of mini-batches to train on.
+   *  - epochs: Total number of full training loops over the full data set.
+   *  - log_interval: Interval, in iterations, between log outputs.
+   *
+   * Outputs:
+   *  - W: Weights (parameters) matrix, of shape (D, K).
+   *  - b: Biases vector, of shape (1, K).
+   */
+  N = nrow(Y)  # num examples
+  D = ncol(X)  # num features
+  K = ncol(Y)  # num classes
+  # Create softmax classifier:
+  # affine -> softmax
+  [W, b] = affine::init(D, K)
+  W = W / sqrt(2.0/(D)) * sqrt(1/(D))
+  # Initialize SGD w/ Nesterov momentum optimizer
+  vW = sgd_nesterov::init(W)  # optimizer momentum state for W
+  vb = sgd_nesterov::init(b)  # optimizer momentum state for b
+  #[mW, vW] = adam::init(W)  # optimizer 1st & 2nd moment state for W
+  #[mb, vb] = adam::init(b)  # optimizer 1st & 2nd moment state for b
+  # Starting validation loss & accuracy
+  probs_val = predict(X_val, W, b)
+  loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+  accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+  # Output results
+  print("Start: Val Loss: " + loss_val + ", Val Accuracy: " + accuracy_val)
+  # Optimize
+  print("Starting optimization")
+  iters = ceil(N / batch_size)
+  for (e in 1:epochs) {
+    for(i in 1:iters) {
+      # Get next batch
+      beg = ((i-1) * batch_size) %% N + 1
+      end = min(N, beg + batch_size - 1)
+      #print("Epoch: " + e + ", Iter: " + i + ", X[" + beg + ":" + end + ",]")
+      X_batch = X[beg:end,]
+      Y_batch = Y[beg:end,]
+      # Compute forward pass
+      ## affine & softmax:
+      out = affine::forward(X_batch, W, b)
+      probs = softmax::forward(out)
+      # Compute backward pass
+      ## loss:
+      dprobs = cross_entropy_loss::backward(probs, Y_batch)
+      ## affine & softmax:
+      dout = softmax::backward(dprobs, out)
+      [dX_batch, dW, db] = affine::backward(dout, X_batch, W, b)
+      # Optimize with SGD w/ Nesterov momentum
+      [W, vW] = sgd_nesterov::update(W, dW, lr, mu, vW)
+      [b, vb] = sgd_nesterov::update(b, db, lr, mu, vb)
+      #[W, mW, vW] = adam::update(W, dW, lr, 0.9, 0.999, 1e-8, e*i-1, mW, vW)
+      #[b, mb, vb] = adam::update(b, db, lr, 0.9, 0.999, 1e-8, e*i-1, mb, vb)
+      # Compute loss & accuracy for training & validation data every `log_interval` iterations.
+      if (i %% log_interval == 0) {
+        #print("Eval time! - i: " + i)
+        # Compute training loss & accuracy
+        loss = cross_entropy_loss::forward(probs, Y_batch)
+        accuracy = mean(rowIndexMax(probs) == rowIndexMax(Y_batch))
+        # Compute validation loss & accuracy
+        probs_val = predict(X_val, W, b)
+        loss_val = cross_entropy_loss::forward(probs_val, Y_val)
+        accuracy_val = mean(rowIndexMax(probs_val) == rowIndexMax(Y_val))
+        # Output results
+        print("Epoch: " + e + "/" + epochs + ", Iter: " + i + "/" + iters
+              + ", Train Loss: " + loss + ", Train Accuracy: " + accuracy + ", Val Loss: "
+              + loss_val + ", Val Accuracy: " + accuracy_val + ", lr: " + lr + ", mu " + mu)
+      }
+    }
+    # Anneal momentum towards 0.999
+    mu = mu + (0.999 - mu)/(1+epochs-e)
+    # Decay learning rate
+    lr = lr * decay
+  }
+predict = function(matrix[double] X, matrix[double] W, matrix[double] b)
+    return (matrix[double] probs) {
+  /*
+   * Computes the class probability predictions of a softmax classifier.
+   *
+   * The input matrix, X, has N examples, each with D features.
+   *
+   * Inputs:
+   *  - X: Input data matrix, of shape (N, D).
+   *  - W: Weights (parameters) matrix, of shape (D, K).
+   *  - b: Biases vector, of shape (1, K).
+   *
+   * Outputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   */
+  N = nrow(X)  # num examples
+  K = ncol(W)  # num classes
+  # Compute forward pass
+  ## affine & softmax:
+  out = affine::forward(X, W, b)
+  probs = softmax::forward(out)
+eval = function(matrix[double] probs, matrix[double] Y)
+    return (double loss, double accuracy) {
+  /*
+   * Evaluates a softmax classifier.
+   *
+   * The probs matrix contains the class probability predictions
+   * of K classes over N examples.  The targets, Y, have K classes,
+   * and are one-hot encoded.
+   *
+   * Inputs:
+   *  - probs: Class probabilities, of shape (N, K).
+   *  - Y: Target matrix, of shape (N, K).
+   *
+   * Outputs:
+   *  - loss: Scalar loss, of shape (1).
+   *  - accuracy: Scalar accuracy, of shape (1).
+   */
+  # Compute loss & accuracy
+  loss = cross_entropy_loss::forward(probs, Y)
+  correct_pred = rowIndexMax(probs) == rowIndexMax(Y)
+  accuracy = mean(correct_pred)
+generate_dummy_data = function()
+    return (matrix[double] X, matrix[double] Y, int C, int Hin, int Win) {
+  /*
+   * Generate a dummy dataset similar to the breast cancer dataset.
+   *
+   * Outputs:
+   *  - X: Input data matrix, of shape (N, D).
+   *  - Y: Target matrix, of shape (N, K).
+   *  - C: Number of input channels (dimensionality of input depth).
+   *  - Hin: Input height.
+   *  - Win: Input width.
+   */
+  # Generate dummy input data
+  N = 1024  # num examples
+  C = 3  # num input channels
+  Hin = 256  # input height
+  Win = 256  # input width
+  T = 10  # num targets
+  X = rand(rows=N, cols=C*Hin*Win, pdf="normal")
+  classes = round(rand(rows=N, cols=1, min=1, max=T, pdf="uniform"))
+  Y = table(seq(1, N), classes)  # one-hot encoding