You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@singa.apache.org by zh...@apache.org on 2021/05/21 13:23:21 UTC

[singa] branch dev updated: autograd for mnist in cifar_distributed_cnn

This is an automated email from the ASF dual-hosted git repository.

zhaojing pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/singa.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8473014  autograd for mnist in cifar_distributed_cnn
     new fb13e50  Merge pull request #853 from naili-xing/cifar-distributed-cnn-new-folder
8473014 is described below

commit 8473014485605efec306735fc62e65a33dd69c99
Author: nailixing <xi...@gmail.com>
AuthorDate: Fri May 21 19:38:15 2021 +0800

    autograd for mnist in cifar_distributed_cnn
---
 .../cifar_distributed_cnn/autograd/mnist_cnn.py    | 304 +++++++++++++++++++++
 .../cifar_distributed_cnn/autograd/mnist_dist.py   |  25 ++
 .../autograd/mnist_multiprocess.py                 |  39 +++
 3 files changed, 368 insertions(+)

diff --git a/examples/cifar_distributed_cnn/autograd/mnist_cnn.py b/examples/cifar_distributed_cnn/autograd/mnist_cnn.py
new file mode 100644
index 0000000..16752ce
--- /dev/null
+++ b/examples/cifar_distributed_cnn/autograd/mnist_cnn.py
@@ -0,0 +1,304 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from singa import singa_wrap as singa
+from singa import autograd
+from singa import layer
+from singa import tensor
+from singa import device
+from singa import opt
+import numpy as np
+import os
+import sys
+import gzip
+import codecs
+import time
+
+
+class CNN:
+
+    def __init__(self):
+        self.conv1 = layer.Conv2d(1, 20, 5, padding=0)
+        self.conv2 = layer.Conv2d(20, 50, 5, padding=0)
+        self.linear1 = layer.Linear(4 * 4 * 50, 500)
+        self.linear2 = layer.Linear(500, 10)
+        self.pooling1 = layer.MaxPool2d(2, 2, padding=0)
+        self.pooling2 = layer.MaxPool2d(2, 2, padding=0)
+        self.relu1 = layer.ReLU()
+        self.relu2 = layer.ReLU()
+        self.relu3 = layer.ReLU()
+        self.flatten = layer.Flatten()
+
+    def forward(self, x):
+        y = self.conv1(x)
+        y = self.relu1(y)
+        y = self.pooling1(y)
+        y = self.conv2(y)
+        y = self.relu2(y)
+        y = self.pooling2(y)
+        y = self.flatten(y)
+        y = self.linear1(y)
+        y = self.relu3(y)
+        y = self.linear2(y)
+        return y
+
+
+def check_dataset_exist(dirpath):
+    if not os.path.exists(dirpath):
+        print(
+            'The MNIST dataset does not exist. Please download the mnist dataset using download_mnist.py (e.g. python3 download_mnist.py)'
+        )
+        sys.exit(0)
+    return dirpath
+
+
+def load_dataset():
+    train_x_path = '/tmp/train-images-idx3-ubyte.gz'
+    train_y_path = '/tmp/train-labels-idx1-ubyte.gz'
+    valid_x_path = '/tmp/t10k-images-idx3-ubyte.gz'
+    valid_y_path = '/tmp/t10k-labels-idx1-ubyte.gz'
+
+    train_x = read_image_file(check_dataset_exist(train_x_path)).astype(
+        np.float32)
+    train_y = read_label_file(check_dataset_exist(train_y_path)).astype(
+        np.float32)
+    valid_x = read_image_file(check_dataset_exist(valid_x_path)).astype(
+        np.float32)
+    valid_y = read_label_file(check_dataset_exist(valid_y_path)).astype(
+        np.float32)
+    return train_x, train_y, valid_x, valid_y
+
+
+def read_label_file(path):
+    with gzip.open(path, 'rb') as f:
+        data = f.read()
+        assert get_int(data[:4]) == 2049
+        length = get_int(data[4:8])
+        parsed = np.frombuffer(data, dtype=np.uint8, offset=8).reshape((length))
+        return parsed
+
+
+def get_int(b):
+    return int(codecs.encode(b, 'hex'), 16)
+
+
+def read_image_file(path):
+    with gzip.open(path, 'rb') as f:
+        data = f.read()
+        assert get_int(data[:4]) == 2051
+        length = get_int(data[4:8])
+        num_rows = get_int(data[8:12])
+        num_cols = get_int(data[12:16])
+        parsed = np.frombuffer(data, dtype=np.uint8, offset=16).reshape(
+            (length, 1, num_rows, num_cols))
+        return parsed
+
+
+def to_categorical(y, num_classes):
+    y = np.array(y, dtype="int")
+    n = y.shape[0]
+    categorical = np.zeros((n, num_classes))
+    categorical[np.arange(n), y] = 1
+    categorical = categorical.astype(np.float32)
+    return categorical
+
+
+def accuracy(pred, target):
+    y = np.argmax(pred, axis=1)
+    t = np.argmax(target, axis=1)
+    a = y == t
+    return np.array(a, "int").sum()
+
+
+# Function to all reduce NUMPY accuracy and loss from multiple devices
+def reduce_variable(variable, dist_opt, reducer):
+    reducer.copy_from_numpy(variable)
+    dist_opt.all_reduce(reducer.data)
+    dist_opt.wait()
+    output = tensor.to_numpy(reducer)
+    return output
+
+
+# Function to sychronize SINGA TENSOR initial model parameters
+def synchronize(tensor, dist_opt):
+    dist_opt.all_reduce(tensor.data)
+    dist_opt.wait()
+    tensor /= dist_opt.world_size
+
+
+# Data augmentation
+def augmentation(x, batch_size):
+    xpad = np.pad(x, [[0, 0], [0, 0], [4, 4], [4, 4]], 'symmetric')
+    for data_num in range(0, batch_size):
+        offset = np.random.randint(8, size=2)
+        x[data_num, :, :, :] = xpad[data_num, :, offset[0]:offset[0] + 28,
+                                    offset[1]:offset[1] + 28]
+        if_flip = np.random.randint(2)
+        if (if_flip):
+            x[data_num, :, :, :] = x[data_num, :, :, ::-1]
+    return x
+
+
+# Data partition
+def data_partition(dataset_x, dataset_y, global_rank, world_size):
+    data_per_rank = dataset_x.shape[0] // world_size
+    idx_start = global_rank * data_per_rank
+    idx_end = (global_rank + 1) * data_per_rank
+    return dataset_x[idx_start:idx_end], dataset_y[idx_start:idx_end]
+
+
+def train_mnist_cnn(DIST=False,
+                    local_rank=None,
+                    world_size=None,
+                    nccl_id=None,
+                    spars=0,
+                    topK=False,
+                    corr=True):
+
+    # Define the hypermeters for the mnist_cnn
+    max_epoch = 10
+    batch_size = 64
+    sgd = opt.SGD(lr=0.005, momentum=0.9, weight_decay=1e-5)
+
+    # Prepare training and valadiation data
+    train_x, train_y, test_x, test_y = load_dataset()
+    IMG_SIZE = 28
+    num_classes = 10
+    train_y = to_categorical(train_y, num_classes)
+    test_y = to_categorical(test_y, num_classes)
+
+    # Normalization
+    train_x = train_x / 255
+    test_x = test_x / 255
+
+    if DIST:
+        # For distributed GPU training
+        sgd = opt.DistOpt(sgd,
+                          nccl_id=nccl_id,
+                          local_rank=local_rank,
+                          world_size=world_size)
+        dev = device.create_cuda_gpu_on(sgd.local_rank)
+
+        # Dataset partition for distributed training
+        train_x, train_y = data_partition(train_x, train_y, sgd.global_rank,
+                                          sgd.world_size)
+        test_x, test_y = data_partition(test_x, test_y, sgd.global_rank,
+                                        sgd.world_size)
+        world_size = sgd.world_size
+    else:
+        # For single GPU
+        dev = device.create_cuda_gpu()
+        world_size = 1
+
+    # Create model
+    model = CNN()
+
+    tx = tensor.Tensor((batch_size, 1, IMG_SIZE, IMG_SIZE), dev, tensor.float32)
+    ty = tensor.Tensor((batch_size, num_classes), dev, tensor.int32)
+    num_train_batch = train_x.shape[0] // batch_size
+    num_test_batch = test_x.shape[0] // batch_size
+    idx = np.arange(train_x.shape[0], dtype=np.int32)
+
+    if DIST:
+        #Sychronize the initial parameters
+        autograd.training = True
+        x = np.random.randn(batch_size, 1, IMG_SIZE,
+                            IMG_SIZE).astype(np.float32)
+        y = np.zeros(shape=(batch_size, num_classes), dtype=np.int32)
+        tx.copy_from_numpy(x)
+        ty.copy_from_numpy(y)
+        out = model.forward(tx)
+        loss = autograd.softmax_cross_entropy(out, ty)
+        for p, g in autograd.backward(loss):
+            synchronize(p, sgd)
+
+    # Training and evaulation loop
+    for epoch in range(max_epoch):
+        start_time = time.time()
+        np.random.shuffle(idx)
+
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Starting Epoch %d:' % (epoch))
+
+        # Training phase
+        autograd.training = True
+        train_correct = np.zeros(shape=[1], dtype=np.float32)
+        test_correct = np.zeros(shape=[1], dtype=np.float32)
+        train_loss = np.zeros(shape=[1], dtype=np.float32)
+
+        for b in range(num_train_batch):
+            x = train_x[idx[b * batch_size:(b + 1) * batch_size]]
+            x = augmentation(x, batch_size)
+            y = train_y[idx[b * batch_size:(b + 1) * batch_size]]
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+            out = model.forward(tx)
+            loss = autograd.softmax_cross_entropy(out, ty)
+            train_correct += accuracy(tensor.to_numpy(out), y)
+            train_loss += tensor.to_numpy(loss)[0]
+            if DIST:
+                if (spars == 0):
+                    sgd.backward_and_update(loss, threshold=50000)
+                else:
+                    sgd.backward_and_sparse_update(loss,
+                                                   spars=spars,
+                                                   topK=topK,
+                                                   corr=corr)
+            else:
+                sgd(loss)
+
+        if DIST:
+            # Reduce the evaluation accuracy and loss from multiple devices
+            reducer = tensor.Tensor((1,), dev, tensor.float32)
+            train_correct = reduce_variable(train_correct, sgd, reducer)
+            train_loss = reduce_variable(train_loss, sgd, reducer)
+
+        # Output the training loss and accuracy
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Training loss = %f, training accuracy = %f' %
+                  (train_loss, train_correct /
+                   (num_train_batch * batch_size * world_size)),
+                  flush=True)
+
+        # Evaluation phase
+        autograd.training = False
+        for b in range(num_test_batch):
+            x = test_x[b * batch_size:(b + 1) * batch_size]
+            y = test_y[b * batch_size:(b + 1) * batch_size]
+            tx.copy_from_numpy(x)
+            ty.copy_from_numpy(y)
+            out_test = model.forward(tx)
+            test_correct += accuracy(tensor.to_numpy(out_test), y)
+
+        if DIST:
+            # Reduce the evaulation accuracy from multiple devices
+            test_correct = reduce_variable(test_correct, sgd, reducer)
+
+        # Output the evaluation accuracy
+        if ((DIST == False) or (sgd.global_rank == 0)):
+            print('Evaluation accuracy = %f, Elapsed Time = %fs' %
+                  (test_correct / (num_test_batch * batch_size * world_size),
+                   time.time() - start_time),
+                  flush=True)
+
+
+if __name__ == '__main__':
+
+    DIST = False
+    train_mnist_cnn(DIST=DIST)
diff --git a/examples/cifar_distributed_cnn/autograd/mnist_dist.py b/examples/cifar_distributed_cnn/autograd/mnist_dist.py
new file mode 100644
index 0000000..3586127
--- /dev/null
+++ b/examples/cifar_distributed_cnn/autograd/mnist_dist.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from mnist_cnn import *
+
+if __name__ == '__main__':
+
+    DIST = True
+    train_mnist_cnn(DIST=DIST)
diff --git a/examples/cifar_distributed_cnn/autograd/mnist_multiprocess.py b/examples/cifar_distributed_cnn/autograd/mnist_multiprocess.py
new file mode 100644
index 0000000..f51344f
--- /dev/null
+++ b/examples/cifar_distributed_cnn/autograd/mnist_multiprocess.py
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from mnist_cnn import *
+import multiprocessing
+import sys
+
+if __name__ == '__main__':
+
+    # Generate a NCCL ID to be used for collective communication
+    nccl_id = singa.NcclIdHolder()
+
+    # Number of GPUs to be used
+    world_size = int(sys.argv[1])
+
+    process = []
+    for local_rank in range(0, world_size):
+        process.append(
+            multiprocessing.Process(target=train_mnist_cnn,
+                                    args=(True, local_rank, world_size, nccl_id)))
+
+    for p in process:
+        p.start()