You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/02/24 06:03:33 UTC

[carbondata] branch master updated: [CARBONDATA-3695] Integrating deep learning framework PyTorch

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 10f9e99  [CARBONDATA-3695]  Integrating deep learning framework PyTorch
10f9e99 is described below

commit 10f9e991d5f97ed0f8b2b1cd7bf9d0fceb9d20d1
Author: xubo245 <60...@qq.com>
AuthorDate: Wed Feb 12 23:38:18 2020 +0800

    [CARBONDATA-3695]  Integrating deep learning framework PyTorch
    
    Why is this PR needed?
    Nowadays AI model training is getting more and more popular. Currently many AI framework uses raw data files or row format data files for model training, it could not provide projection, filtering, and fast scan capability like in columnar store. So, if CarbonData supports AI framework, it can speed up model training by increase IO throughput, and provide more flexible training set selection ability to AI developers
    
    What changes were proposed in this PR?
    
    Does this PR introduce any user interface change?
    Yes. Added a new interface for pytorch
    
    def make_data_loader(reader, batch_size=1, collate_fn=decimal_friendly_collate):
    
    Is any new testcase added?
    Yes
    pytorch_example_carbon_unified_api.py . and so on.
    
    This closes #3617
---
 python/README.md                                   | 121 +++++++++++---
 python/pycarbon/integration/pytorch.py             | 138 ++++++++++++++++
 python/pycarbon/reader.py                          |  14 ++
 .../pytorch_hello_world_carbon.py                  |  53 ++++++
 .../pytorch_hello_world_carbon.py                  |  52 ++++++
 .../pytorch_example_carbon.py                      | 155 ++++++++++++++++++
 .../pytorch_example_carbon_unified_api.py          | 177 +++++++++++++++++++++
 7 files changed, 687 insertions(+), 23 deletions(-)

diff --git a/python/README.md b/python/README.md
index 39f6808..3557e1b 100644
--- a/python/README.md
+++ b/python/README.md
@@ -14,7 +14,7 @@ $ pip install . --user
 ## how to use
 
 if you have a CarbonData dataset, you can use PyCarbon to read data. For the generation of CarbonData dataset, you can see the examples:
-`generate_dataset_carbon.py` in test/hello_world and `generate_pycarbon_dataset.py` in test/hello_world.
+`generate_dataset_carbon.py` in tests/hello_world/dataset_with_unischema.
 But user should do some config first:
 
  - config pyspark and add carbon assembly jar to pyspark/jars folder, which can be compiled from CarbonData project.
@@ -22,44 +22,81 @@ But user should do some config first:
 
  - set JAVA_HOME, PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON in you system environment.
 
-#### PySpark and SQL
-
-#### Generating the data:
+### Generating the data:
 ```
 
     python generate_pycarbon_mnist.py --carbon-sdk-path  /your_path/carbondata/store/sdk/target/carbondata-sdk.jar 
 
 ```
-##### Part of code:
+#### Part of code:
+```
+      spark.createDataFrame(sql_rows, MnistSchema.as_spark_schema()) \
+        .coalesce(carbon_files_count) \
+        .write \
+        .save(path=dset_output_url, format='carbon')
+```
+some details are illustrated in `generate_pycarbon_mnist.py <https://github.com/apache/carbondata/blob/master/python/pycarbon/tests/mnist/dataset_with_unischema/generate_pycarbon_mnist.py>` in test/hello_world.
+
+### PyCarbon Reader API
+
 ```
-    # Create a dataframe object from carbon files
-    spark.sql("create table readcarbon using carbon location '" + str(dataset_path) + "'")
-    dataframe = spark.sql("select * from readcarbon")
 
-    # Show a schema
-    dataframe.printSchema()
+def make_reader(dataset_url=None,
+                workers_count=10,
+                results_queue_size=100,
+                num_epochs=1,
+                shuffle=True,
+                is_batch=True
+                ):
+  """
+  an unified api for different data format dataset
+
+  :param dataset_url: an filepath or a url to a carbon directory,
+      e.g. ``'hdfs://some_hdfs_cluster/user/yevgeni/carbon8'``, or ``'file:///tmp/mydataset'``
+      or ``'s3a://bucket/mydataset'``.
+  :param workers_count: An int for the number of workers to use in the reader pool. This only is used for the
+      thread or process pool. Defaults to 10
+  :param results_queue_size: Size of the results queue to store prefetched rows. Currently only applicable to
+      thread reader pool type.
+  :param shuffle: Whether to shuffle partition (the order in which full partition are read)
+  :param num_epochs: An epoch is a single pass over all rows in the dataset. Setting ``num_epochs`` to
+      ``None`` will result in an infinite number of epochs.
+  :param is_batch: return single record or batch records (default: True)
+  :return: A :class:`Reader` object
+  """
+```
 
-    # Count all
-    dataframe.count()
+### TensorFlow Dataset API
+```python
 
-    # Show just some columns
-    dataframe.select('id').show()
+def make_dataset(reader):
+  """
+  Creates a `tensorflow.data.Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ object from
 
-    # Also use a standard SQL to query a dataset
-    spark.sql('SELECT count(id) from carbon.`{}` '.format(dataset_url)).collect()
+  :param reader: An instance of PyCarbon Reader object that would serve as a data source.
+  :return: A ``tf.data.Dataset`` instance.
+  """
 ```
-some details are illustrated in `pyspark_hello_world_carbon.py` in test/hello_world.
 
-#### TensorFlow Dataset API
+### TensorFlow Tensor API
+```python
 
+def make_tensor(reader):
+  """Bridges between python-only interface of the Reader (next(Reader)) and tensorflow world.
 
-##### Running train and test based on mnist:
+  This function returns a named tuple of tensors from the dataset, 
+
+  :param reader: An instance of Reader object used as the data source
+  """
+```
+
+#### Running train and test based on mnist:
 
 ```
     python  tf_example_carbon_unified_api.py --carbon-sdk-path  /your_path/carbondata/store/sdk/target/carbondata-sdk.jar 
 
 ```
-##### Part or code:
+#### Part or code:
 ```
     with make_reader('file:///some/localpath/a_dataset') as reader:
         dataset = make_dataset(reader)
@@ -69,10 +106,10 @@ some details are illustrated in `pyspark_hello_world_carbon.py` in test/hello_wo
             sample = sess.run(tensor)
             print(sample.id)
 
-some details are illustrated in `tf_example_carbon_unified_api.py` in test/mnist. 
 ```
+some details are illustrated in `tf_example_carbon_unified_api.py <https://github.com/apache/carbondata/blob/master/python/pycarbon/tests/mnist/dataset_with_unischema/tf_example_carbon_unified_api.py>` in tests/mnist. 
 
-#####  Part of result:
+####  Part of result:
 
 ```
 2020-01-20 21:12:31 INFO  DictionaryBasedVectorResultCollector:72 - Direct pagewise vector fill collector is used to scan and collect the data
@@ -91,4 +128,42 @@ After 90 training iterations, the accuracy of the model is: 0.80
 After 99 training iterations, the accuracy of the model is: 0.79
 all time: 185.28250288963318
 Finish
-```
\ No newline at end of file
+```
+
+### PyTorch Dataset API
+
+```python
+def make_data_loader(reader, batch_size=1):
+  """
+  Initializes a data loader object, with a default collate.
+
+  Number of epochs is defined by the configuration of the reader argument.
+
+  :param reader: PyCarbon Reader instance
+  :param batch_size: the number of items to return per batch; factored into the len() of this reader
+  """
+```
+
+####  Part or code:
+```python
+
+  with make_data_loader(make_reader('{}/train'.format(args.dataset_url), is_batch=False, num_epochs=reader_epochs,
+                                      transform_spec=transform),
+                          batch_size=args.batch_size) as train_loader:
+      train(model, device, train_loader, args.log_interval, optimizer, epoch)
+      
+```
+some details are illustrated in `pytorch_example_carbon_unified_api.py <https://github.com/apache/carbondata/blob/master/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon_unified_api.py>` in tests/mnist. 
+
+####  Part of result:
+```python
+Train Epoch: 10 [55680]	Loss: 0.255295
+Train Epoch: 10 [56320]	Loss: 0.132586
+Train Epoch: 10 [56960]	Loss: 0.197574
+Train Epoch: 10 [57600]	Loss: 0.280921
+Train Epoch: 10 [58240]	Loss: 0.072130
+Train Epoch: 10 [58880]	Loss: 0.027580
+Train Epoch: 10 [59520]	Loss: 0.036734
+Test set: Average loss: 0.0508, Accuracy: 9843/10000 (98%)
+
+```
diff --git a/python/pycarbon/integration/pytorch.py b/python/pycarbon/integration/pytorch.py
new file mode 100644
index 0000000..21c9af7
--- /dev/null
+++ b/python/pycarbon/integration/pytorch.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""A set of PyTorch specific helper functions"""
+
+import re
+
+import collections
+import decimal
+
+import numpy as np
+from six import PY2
+from torch.utils.data.dataloader import default_collate
+
+if PY2:
+  _string_classes = basestring  # noqa: F821
+else:
+  _string_classes = (str, bytes)
+
+
+def _sanitize_pytorch_types(row_as_dict):
+  """Promotes values types in a dictionary to the types supported by pytorch. Raises an error if type is clear error
+  if the type can not be promoted.
+
+  The parameter is modified in-place.
+
+  int8, uint16 are promoted to int32; uint32 -> int64;
+  numpy string_, unicode_, object arrays are not supported.
+
+  :param dict[str,obj] row_as_dict: a dictionary of key-value pairs. The values types are promoted to
+      pytorch compatible.
+  :return: None
+  """
+  for name, value in row_as_dict.items():
+    # PyTorch supported types are: double, float, float16, int64, int32, and uint8
+    if isinstance(value, np.ndarray):
+      if value.dtype == np.int8:
+        row_as_dict[name] = value.astype(np.int16)
+      elif value.dtype == np.uint16:
+        row_as_dict[name] = value.astype(np.int32)
+      elif value.dtype == np.uint32:
+        row_as_dict[name] = value.astype(np.int64)
+      elif value.dtype == np.bool_:
+        row_as_dict[name] = value.astype(np.uint8)
+      elif re.search('[SaUO]', value.dtype.str):
+        raise TypeError('Pytorch does not support arrays of string classes. Found in field {}'.format(name))
+    elif isinstance(value, np.bool_):
+      row_as_dict[name] = np.uint8(value)
+    elif value is None:
+      raise TypeError('Pytorch does not support nullable fields. Found None in {}'.format(name))
+
+
+def decimal_friendly_collate(batch):
+  """A wrapper on top of ``default_collate`` function that allows decimal.Decimal types to be collated.
+
+  We use ``decimal.Decimal`` types in dataset to represent timestamps. PyTorch's ``default_collate``
+  implementation does not support collating ``decimal.Decimal`` types. ``decimal_friendly_collate`` collates
+  ``decimal.Decimal`` separately and then combines with the rest of the fields collated by a standard
+  ``default_collate``.
+
+  :param batch: A list of dictionaries to collate
+  :return: A dictionary of lists/pytorch.Tensor types
+  """
+
+  if isinstance(batch[0], decimal.Decimal):
+    return batch
+  elif isinstance(batch[0], collections.Mapping):
+    return {key: decimal_friendly_collate([d[key] for d in batch]) for key in batch[0]}
+  elif isinstance(batch[0], _string_classes):
+    return batch
+  elif isinstance(batch[0], collections.Sequence):
+    transposed = zip(*batch)
+    return [decimal_friendly_collate(samples) for samples in transposed]
+  else:
+    return default_collate(batch)
+
+
+class DataLoader(object):
+  """
+  A data loader adaptor for ``torch.utils.data.DataLoader``.
+
+  This class iterates and returns items from the Reader in batches.
+
+  This loader can be used as an iterator and will terminate when the reader used in the construction of the class
+  runs out of samples.
+  """
+
+  def __init__(self, reader, batch_size=1, collate_fn=decimal_friendly_collate):
+    """
+    Initializes a data loader object, with a default collate.
+
+    Number of epochs is defined by the configuration of the reader argument.
+
+    :param reader: Reader instance
+    :param batch_size: the number of items to return per batch; factored into the len() of this reader
+    :param collate_fn: an optional callable to merge a list of samples to form a mini-batch.
+    """
+    self.reader = reader
+    self.batch_size = batch_size
+    self.collate_fn = collate_fn
+
+  def __iter__(self):
+    """
+    The Data Loader iterator stops the for-loop when reader runs out of samples.
+    """
+    batch = []
+    for row in self.reader:
+      # Default collate does not work nicely on namedtuples and treat them as lists
+      # Using dict will result in the yielded structures being dicts as well
+      row_as_dict = row._asdict()
+      _sanitize_pytorch_types(row_as_dict)
+      batch.append(row_as_dict)
+      if len(batch) == self.batch_size:
+        yield self.collate_fn(batch)
+        batch = []
+    if batch:
+      yield self.collate_fn(batch)
+
+  # Functions needed to treat data loader as a context manager
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exc_type, exc_val, exc_tb):
+    self.reader.stop()
+    self.reader.join()
diff --git a/python/pycarbon/reader.py b/python/pycarbon/reader.py
index 8b7fb3e..5f3d924 100644
--- a/python/pycarbon/reader.py
+++ b/python/pycarbon/reader.py
@@ -17,6 +17,7 @@
 from obs import ObsClient
 
 from pycarbon.core.carbon_reader import make_carbon_reader, make_batch_carbon_reader
+from pycarbon.integration.pytorch import decimal_friendly_collate, DataLoader
 from pycarbon.integration.tensorflow import TensorFlow
 
 
@@ -200,3 +201,16 @@ def make_tensor(reader, shuffling_queue_capacity=0, min_after_dequeue=0):
   """
   tensorflow = TensorFlow()
   return tensorflow.make_tensor(reader, shuffling_queue_capacity, min_after_dequeue)
+
+
+def make_data_loader(reader, batch_size=1, collate_fn=decimal_friendly_collate):
+  """
+  Initializes a data loader object, with a default collate.
+
+  Number of epochs is defined by the configuration of the reader argument.
+
+  :param reader: PyCarbon Reader instance
+  :param batch_size: the number of items to return per batch; factored into the len() of this reader
+  :param collate_fn: an optional callable to merge a list of samples to form a mini-batch.
+  """
+  return DataLoader(reader, batch_size=batch_size, collate_fn=collate_fn)
diff --git a/python/pycarbon/tests/hello_world/dataset_with_normal_schema/pytorch_hello_world_carbon.py b/python/pycarbon/tests/hello_world/dataset_with_normal_schema/pytorch_hello_world_carbon.py
new file mode 100644
index 0000000..255ed17
--- /dev/null
+++ b/python/pycarbon/tests/hello_world/dataset_with_normal_schema/pytorch_hello_world_carbon.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Minimal example of how to read samples from a dataset generated by `generate_external_dataset_carbon.py`
+using pytorch, using make_batch_carbon_reader() instead of make_carbon_reader()"""
+
+from __future__ import print_function
+
+import argparse
+import jnius_config
+
+from petastorm.pytorch import DataLoader
+
+from pycarbon.reader import make_reader
+from pycarbon.reader import make_data_loader
+
+from pycarbon.tests import DEFAULT_CARBONSDK_PATH
+
+
+def pytorch_hello_world(dataset_url='file:///tmp/carbon_external_dataset'):
+  with DataLoader(make_reader(dataset_url)) as train_loader:
+    sample = next(iter(train_loader))
+    # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row
+    print("id batch: {0}".format(sample['id']))
+
+  with make_data_loader(make_reader(dataset_url)) as train_loader:
+    sample = next(iter(train_loader))
+    # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row
+    print("id batch: {0}".format(sample['id']))
+
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(description='Pytorch hello world')
+  parser.add_argument('-c', '--carbon-sdk-path', type=str, default=DEFAULT_CARBONSDK_PATH,
+                      help='carbon sdk path')
+
+  args = parser.parse_args()
+
+  jnius_config.set_classpath(args.carbon_sdk_path)
+
+  pytorch_hello_world()
diff --git a/python/pycarbon/tests/hello_world/dataset_with_unischema/pytorch_hello_world_carbon.py b/python/pycarbon/tests/hello_world/dataset_with_unischema/pytorch_hello_world_carbon.py
new file mode 100644
index 0000000..a04666c
--- /dev/null
+++ b/python/pycarbon/tests/hello_world/dataset_with_unischema/pytorch_hello_world_carbon.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""Minimal example of how to read samples from a dataset generated by `generate_pycarbon_dataset.py`
+using pytorch."""
+
+from __future__ import print_function
+
+import argparse
+import jnius_config
+
+from petastorm.pytorch import DataLoader
+
+from pycarbon.reader import make_reader
+from pycarbon.reader import make_data_loader
+
+from pycarbon.tests import DEFAULT_CARBONSDK_PATH
+
+
+def pytorch_hello_world(dataset_url='file:///tmp/carbon_pycarbon_dataset'):
+  with DataLoader(make_reader(dataset_url, is_batch=False)) as train_loader:
+    sample = next(iter(train_loader))
+    print(sample['id'])
+
+  with make_data_loader(make_reader(dataset_url, is_batch=False)) as train_loader:
+    sample = next(iter(train_loader))
+    print(sample['id'])
+
+
+if __name__ == '__main__':
+  parser = argparse.ArgumentParser(description='Pytorch hello world')
+  parser.add_argument('-c', '--carbon-sdk-path', type=str, default=DEFAULT_CARBONSDK_PATH,
+                      help='carbon sdk path')
+
+  args = parser.parse_args()
+
+  jnius_config.set_classpath(args.carbon_sdk_path)
+
+  pytorch_hello_world()
diff --git a/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon.py b/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon.py
new file mode 100644
index 0000000..de2eeed
--- /dev/null
+++ b/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon.py
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import division, print_function
+
+import argparse
+
+import jnius_config
+
+import torch
+import torch.nn.functional as F
+import torch.optim as optim
+from torchvision import transforms
+
+from pycarbon.reader import make_reader
+from pycarbon.tests.mnist.dataset_with_unischema import DEFAULT_MNIST_DATA_PATH
+from pycarbon.tests import DEFAULT_CARBONSDK_PATH
+
+from petastorm.pytorch import DataLoader
+from petastorm import TransformSpec
+from examples.mnist.pytorch_example import Net
+
+def train(model, device, train_loader, log_interval, optimizer, epoch):
+  model.train()
+  for batch_idx, row in enumerate(train_loader):
+    data, target = row['image'].to(device), row['digit'].to(device)
+    optimizer.zero_grad()
+    output = model(data)
+    loss = F.nll_loss(output, target)
+    loss.backward()
+    optimizer.step()
+    if batch_idx % log_interval == 0:
+      print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
+        epoch, batch_idx * len(data), loss.item()))
+
+
+def evaluation(model, device, test_loader):
+  model.eval()
+  test_loss = 0
+  correct = 0
+  count = 0
+  with torch.no_grad():
+    for row in test_loader:
+      data, target = row['image'].to(device), row['digit'].to(device)
+      output = model(data)
+      test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
+      pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
+      correct += pred.eq(target.view_as(pred)).sum().item()
+      count += data.shape[0]
+
+  test_loss /= count
+  print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
+    test_loss, correct, count, 100. * correct / count))
+
+
+def _transform_row(mnist_row):
+  # For this example, the images are stored as simpler ndarray (28,28), but the
+  # training network expects 3-dim images, hence the additional lambda transform.
+  transform = transforms.Compose([
+    transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
+    transforms.ToTensor(),
+    transforms.Normalize((0.1307,), (0.3081,))
+  ])
+  # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
+  # data or target transform, but that actually gives the user more flexibility
+  # to make the desired partial transform, as shown here.
+  result_row = {
+    'image': transform(mnist_row['image']),
+    'digit': mnist_row['digit']
+  }
+
+  return result_row
+
+
+def main():
+  # Training settings
+  parser = argparse.ArgumentParser(description='Pycarbon MNIST Example')
+  default_dataset_url = 'file://{}'.format(DEFAULT_MNIST_DATA_PATH)
+  parser.add_argument('--dataset-url', type=str,
+                      default=default_dataset_url, metavar='S',
+                      help='hdfs:// or file:/// URL to the MNIST pycarbon dataset '
+                           '(default: %s)' % default_dataset_url)
+  parser.add_argument('--batch-size', type=int, default=64, metavar='N',
+                      help='input batch size for training (default: 64)')
+  parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
+                      help='input batch size for testing (default: 1000)')
+  parser.add_argument('--epochs', type=int, default=10, metavar='N',
+                      help='number of epochs to train (default: 10)')
+  parser.add_argument('--all-epochs', action='store_true', default=False,
+                      help='train all epochs before testing accuracy/loss')
+  parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
+                      help='learning rate (default: 0.01)')
+  parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
+                      help='SGD momentum (default: 0.5)')
+  parser.add_argument('--no-cuda', action='store_true', default=False,
+                      help='disables CUDA training')
+  parser.add_argument('--seed', type=int, default=1, metavar='S',
+                      help='random seed (default: 1)')
+  parser.add_argument('--log-interval', type=int, default=10, metavar='N',
+                      help='how many batches to wait before logging training status')
+  parser.add_argument('--carbon-sdk-path', type=str, default=DEFAULT_CARBONSDK_PATH,
+                      help='carbon sdk path')
+  args = parser.parse_args()
+  use_cuda = not args.no_cuda and torch.cuda.is_available()
+
+  jnius_config.set_classpath(args.carbon_sdk_path)
+
+  torch.manual_seed(args.seed)
+
+  device = torch.device('cuda' if use_cuda else 'cpu')
+
+  model = Net().to(device)
+  optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
+
+  # Configure loop and Reader epoch for illustrative purposes.
+  # Typical training usage would use the `all_epochs` approach.
+  #
+  if args.all_epochs:
+    # Run training across all the epochs before testing for accuracy
+    loop_epochs = 1
+    reader_epochs = args.epochs
+  else:
+    # Test training accuracy after each epoch
+    loop_epochs = args.epochs
+    reader_epochs = 1
+
+  transform = TransformSpec(_transform_row, removed_fields=['idx'])
+
+  # Instantiate each pycarbon Reader with a single thread, shuffle enabled, and appropriate epoch setting
+  for epoch in range(1, loop_epochs + 1):
+    with DataLoader(make_reader('{}/train'.format(args.dataset_url), num_epochs=reader_epochs,
+                                       transform_spec=transform, is_batch=False),
+                    batch_size=args.batch_size) as train_loader:
+      train(model, device, train_loader, args.log_interval, optimizer, epoch)
+
+    with DataLoader(make_reader('{}/test'.format(args.dataset_url), num_epochs=reader_epochs,
+                                       transform_spec=transform, is_batch=False),
+                    batch_size=args.test_batch_size) as test_loader:
+      evaluation(model, device, test_loader)
+
+
+if __name__ == '__main__':
+  main()
diff --git a/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon_unified_api.py b/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon_unified_api.py
new file mode 100644
index 0000000..6c9f4f2
--- /dev/null
+++ b/python/pycarbon/tests/mnist/dataset_with_unischema/pytorch_example_carbon_unified_api.py
@@ -0,0 +1,177 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from __future__ import division, print_function
+
+import argparse
+
+import jnius_config
+
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+import torch.optim as optim
+from torchvision import transforms
+
+from pycarbon.tests.mnist.dataset_with_unischema import DEFAULT_MNIST_DATA_PATH
+from pycarbon.tests import DEFAULT_CARBONSDK_PATH
+
+from petastorm import TransformSpec
+
+from pycarbon.reader import make_reader
+from pycarbon.reader import make_data_loader
+
+class Net(nn.Module):
+  def __init__(self):
+    super(Net, self).__init__()
+    self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
+    self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
+    self.conv2_drop = nn.Dropout2d()
+    self.fc1 = nn.Linear(320, 50)
+    self.fc2 = nn.Linear(50, 10)
+
+  # pylint: disable=arguments-differ
+  def forward(self, x):
+    x = F.relu(F.max_pool2d(self.conv1(x), 2))
+    x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
+    x = x.view(-1, 320)
+    x = F.relu(self.fc1(x))
+    x = F.dropout(x, training=self.training)
+    x = self.fc2(x)
+    return F.log_softmax(x, dim=1)
+
+
+def train(model, device, train_loader, log_interval, optimizer, epoch):
+  model.train()
+  for batch_idx, row in enumerate(train_loader):
+    data, target = row['image'].to(device), row['digit'].to(device)
+    optimizer.zero_grad()
+    output = model(data)
+    loss = F.nll_loss(output, target)
+    loss.backward()
+    optimizer.step()
+    if batch_idx % log_interval == 0:
+      print('Train Epoch: {} [{}]\tLoss: {:.6f}'.format(
+        epoch, batch_idx * len(data), loss.item()))
+
+
+def evaluation(model, device, test_loader):
+  model.eval()
+  test_loss = 0
+  correct = 0
+  count = 0
+  with torch.no_grad():
+    for row in test_loader:
+      data, target = row['image'].to(device), row['digit'].to(device)
+      output = model(data)
+      test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
+      pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
+      correct += pred.eq(target.view_as(pred)).sum().item()
+      count += data.shape[0]
+
+  test_loss /= count
+  print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
+    test_loss, correct, count, 100. * correct / count))
+
+
+def _transform_row(mnist_row):
+  # For this example, the images are stored as simpler ndarray (28,28), but the
+  # training network expects 3-dim images, hence the additional lambda transform.
+  transform = transforms.Compose([
+    transforms.Lambda(lambda nd: nd.reshape(28, 28, 1)),
+    transforms.ToTensor(),
+    transforms.Normalize((0.1307,), (0.3081,))
+  ])
+  # In addition, the petastorm pytorch DataLoader does not distinguish the notion of
+  # data or target transform, but that actually gives the user more flexibility
+  # to make the desired partial transform, as shown here.
+  result_row = {
+    'image': transform(mnist_row['image']),
+    'digit': mnist_row['digit']
+  }
+
+  return result_row
+
+
+def main():
+  # Training settings
+  parser = argparse.ArgumentParser(description='Pycarbon MNIST Example')
+  default_dataset_url = 'file://{}'.format(DEFAULT_MNIST_DATA_PATH)
+  parser.add_argument('--dataset-url', type=str,
+                      default=default_dataset_url, metavar='S',
+                      help='hdfs:// or file:/// URL to the MNIST pycarbon dataset '
+                           '(default: %s)' % default_dataset_url)
+  parser.add_argument('--batch-size', type=int, default=64, metavar='N',
+                      help='input batch size for training (default: 64)')
+  parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
+                      help='input batch size for testing (default: 1000)')
+  parser.add_argument('--epochs', type=int, default=10, metavar='N',
+                      help='number of epochs to train (default: 10)')
+  parser.add_argument('--all-epochs', action='store_true', default=False,
+                      help='train all epochs before testing accuracy/loss')
+  parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
+                      help='learning rate (default: 0.01)')
+  parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
+                      help='SGD momentum (default: 0.5)')
+  parser.add_argument('--no-cuda', action='store_true', default=False,
+                      help='disables CUDA training')
+  parser.add_argument('--seed', type=int, default=1, metavar='S',
+                      help='random seed (default: 1)')
+  parser.add_argument('--log-interval', type=int, default=10, metavar='N',
+                      help='how many batches to wait before logging training status')
+  parser.add_argument('--carbon-sdk-path', type=str, default=DEFAULT_CARBONSDK_PATH,
+                      help='carbon sdk path')
+  args = parser.parse_args()
+  use_cuda = not args.no_cuda and torch.cuda.is_available()
+
+  jnius_config.set_classpath(args.carbon_sdk_path)
+
+  torch.manual_seed(args.seed)
+
+  device = torch.device('cuda' if use_cuda else 'cpu')
+
+  model = Net().to(device)
+  optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
+
+  # Configure loop and Reader epoch for illustrative purposes.
+  # Typical training usage would use the `all_epochs` approach.
+  #
+  if args.all_epochs:
+    # Run training across all the epochs before testing for accuracy
+    loop_epochs = 1
+    reader_epochs = args.epochs
+  else:
+    # Test training accuracy after each epoch
+    loop_epochs = args.epochs
+    reader_epochs = 1
+
+  transform = TransformSpec(_transform_row, removed_fields=['idx'])
+
+  # Instantiate each pycarbon Reader with a single thread, shuffle enabled, and appropriate epoch setting
+  for epoch in range(1, loop_epochs + 1):
+    with make_data_loader(make_reader('{}/train'.format(args.dataset_url), is_batch=False, num_epochs=reader_epochs,
+                                      transform_spec=transform),
+                          batch_size=args.batch_size) as train_loader:
+      train(model, device, train_loader, args.log_interval, optimizer, epoch)
+
+    with make_data_loader(make_reader('{}/test'.format(args.dataset_url), is_batch=False, num_epochs=reader_epochs,
+                                      transform_spec=transform),
+                          batch_size=args.test_batch_size) as test_loader:
+      evaluation(model, device, test_loader)
+
+
+if __name__ == '__main__':
+  main()