You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by GitBox <gi...@apache.org> on 2017/11/01 20:58:17 UTC

[GitHub] Luo-Liang closed pull request #8511: Dev temp

Luo-Liang closed pull request #8511: Dev temp
URL: https://github.com/apache/incubator-mxnet/pull/8511
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitmodules b/.gitmodules
index 7a76cbaf78..32d740fb9e 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -3,10 +3,10 @@
 	url = https://github.com/dmlc/mshadow.git
 [submodule "dmlc-core"]
 	path = dmlc-core
-	url = https://github.com/dmlc/dmlc-core.git
+	url = https://github.com/Luo-Liang/dmlc-core.git
 [submodule "ps-lite"]
 	path = ps-lite
-	url = https://github.com/dmlc/ps-lite
+	url = https://github.com/Luo-Liang/ps-lite.git
 [submodule "nnvm"]
 	path = nnvm
 	url = https://github.com/dmlc/nnvm
diff --git a/Makefile b/Makefile
index e821c6faa5..0f287cae02 100644
--- a/Makefile
+++ b/Makefile
@@ -63,10 +63,10 @@ endif
 ifeq ($(DEBUG), 1)
 	CFLAGS += -g -O0
 else
-	CFLAGS += -O3 -DNDEBUG=1
+	CFLAGS += -O3 -DNDEBUG=1 -g
 endif
 CFLAGS += -I$(ROOTDIR)/mshadow/ -I$(ROOTDIR)/dmlc-core/include -fPIC -I$(NNVM_PATH)/include -I$(DLPACK_PATH)/include -Iinclude $(MSHADOW_CFLAGS)
-LDFLAGS = -pthread $(MSHADOW_LDFLAGS) $(DMLC_LDFLAGS)
+LDFLAGS = -pthread -libverbs -lnuma -lboost_filesystem -lboost_system $(MSHADOW_LDFLAGS) $(DMLC_LDFLAGS)
 ifeq ($(DEBUG), 1)
 	NVCCFLAGS += -std=c++11 -Xcompiler -D_FORCE_INLINES -g -G -O0 -ccbin $(CXX) $(MSHADOW_NVCCFLAGS)
 else
diff --git a/dmlc-core b/dmlc-core
index 595d02c0e8..85c2f51d58 160000
--- a/dmlc-core
+++ b/dmlc-core
@@ -1 +1 @@
-Subproject commit 595d02c0e87be8a0846700462b6f45f1b1031e39
+Subproject commit 85c2f51d58f280a03fa420ee01aab3257fafc654
diff --git a/example/image-classification/train_mnist.py b/example/image-classification/train_mnist.py
index 2bc4289318..e855f8e698 100644
--- a/example/image-classification/train_mnist.py
+++ b/example/image-classification/train_mnist.py
@@ -57,7 +57,7 @@ def get_mnist_iter(args, kv):
     (val_lbl, val_img) = read_data(
             't10k-labels-idx1-ubyte.gz', 't10k-images-idx3-ubyte.gz')
     train = mx.io.NDArrayIter(
-        to4d(train_img), train_lbl, args.batch_size, shuffle=True)
+        to4d(train_img), train_lbl, args.batch_size, shuffle=False)
     val = mx.io.NDArrayIter(
         to4d(val_img), val_lbl, args.batch_size)
     return (train, val)
diff --git a/include/mxnet/c_api.h b/include/mxnet/c_api.h
index 55b840dd2c..32d3259585 100644
--- a/include/mxnet/c_api.h
+++ b/include/mxnet/c_api.h
@@ -1530,6 +1530,13 @@ MXNET_DLL int MXInitPSEnv(mx_uint num_vars,
  */
 MXNET_DLL int MXKVStoreCreate(const char *type,
                               KVStoreHandle *out);
+
+/*!
+ * \brief Initialize PHUB training parameters
+ */
+MXNET_DLL int MXInitPHUBOptimizerParam(mx_uint num, float* vals); 
+
+
 /*!
  * \brief Delete a KVStore handle.
  * \param handle handle to the kvstore
@@ -1549,6 +1556,12 @@ MXNET_DLL int MXKVStoreInit(KVStoreHandle handle,
                             const int* keys,
                             NDArrayHandle* vals);
 
+MXNET_DLL int MXKVStoreInitPHUB(KVStoreHandle handle,
+				const int key,
+				NDArrayHandle* vals,
+				mx_uint num);
+
+
 /*!
  * \brief Init a list of (key,value) pairs in kvstore, where each key is a string
  * \param handle handle to the kvstore
diff --git a/include/mxnet/ndarray.h b/include/mxnet/ndarray.h
index 84ee9fa5e4..fdd17b1755 100644
--- a/include/mxnet/ndarray.h
+++ b/include/mxnet/ndarray.h
@@ -37,6 +37,7 @@
 #include "./base.h"
 #include "./storage.h"
 #include "./engine.h"
+#include "../../ps-lite/include/dmlc/DIME.h" 
 #if MKL_EXPERIMENTAL == 1
 #include <mkl_memory.h>
 #endif
@@ -74,6 +75,16 @@ class NDArray {
     Mkl_mem_ = MKLMemHolder::create();
 #endif
   }
+
+  std::string Summarize(int firstN=10)
+  {
+      if(is_none()) return "";
+      auto tblob = data();
+      auto ptr = (float*)tblob.dptr_;
+      auto sz = shape().Size();
+      //auto sigptr = (int*)tblob.dptr_;
+      return SummarizeContinousBuffer(ptr,sz,firstN);
+  }
   /*!
    * \brief constructs a new dynamic NDArray
    * \param shape the shape of array
@@ -539,6 +550,16 @@ class NDArray {
     ptr_->CheckAndAlloc();
   }
 
+  /*
+   * Internally reshape NDArray.
+   */
+  inline void ReshapeInternalExact(const TShape &shape)
+  {
+      CHECK_EQ(shape.Size(), shape_.Size());
+      shape_ = shape;
+  }
+
+
   /*!
    * \brief Allocate the space if the allocation has been delayed
    * or the requested size is bigger than the available one.
diff --git a/make/config.mk b/make/config.mk
index d47d4d6931..67dc1fbf89 100644
--- a/make/config.mk
+++ b/make/config.mk
@@ -31,7 +31,7 @@ DEV = 0
 DEBUG = 0
 
 # whether compile with profiler
-USE_PROFILER =
+USE_PROFILER = 1
 
 # whether to turn on signal handler (e.g. segfault logger)
 USE_SIGNAL_HANDLER =
@@ -47,15 +47,15 @@ ADD_CFLAGS =
 #---------------------------------------------
 
 # whether use CUDA during compile
-USE_CUDA = 0
+USE_CUDA = 1
 
 # add the path to CUDA library to link and compile flag
 # if you have already add them to environment variable, leave it as NONE
-# USE_CUDA_PATH = /usr/local/cuda
-USE_CUDA_PATH = NONE
+USE_CUDA_PATH = /usr/local/cuda
+#USE_CUDA_PATH = NONE
 
 # whether use CuDNN R3 library
-USE_CUDNN = 0
+USE_CUDNN = 1
 
 # whether use opencv during compilation
 # you can disable it, however, you will not able to use
@@ -95,7 +95,7 @@ UNAME_S := $(shell uname -s)
 ifeq ($(UNAME_S), Darwin)
 USE_BLAS = apple
 else
-USE_BLAS = atlas
+USE_BLAS = openblas
 endif
 
 # whether use lapack during compilation
@@ -133,7 +133,7 @@ endif
 #----------------------------
 
 # whether or not to enable multi-machine supporting
-USE_DIST_KVSTORE = 0
+USE_DIST_KVSTORE = 1
 
 # whether or not allow to read and write HDFS directly. If yes, then hadoop is
 # required
diff --git a/ps-lite b/ps-lite
index bdd4c67e9e..65c6364bb3 160000
--- a/ps-lite
+++ b/ps-lite
@@ -1 +1 @@
-Subproject commit bdd4c67e9e34dc0b8350ce306b0caa737eb31c83
+Subproject commit 65c6364bb373138d4c47798c63e0f68e0f0415b3
diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py
index adfef9a949..b4a8f5c94e 100644
--- a/python/mxnet/kvstore.py
+++ b/python/mxnet/kvstore.py
@@ -63,6 +63,29 @@ def _ctype_key_value(keys, vals):
                  else c_array(ctypes.c_int, [keys] * len(vals))
         return (c_keys, c_array(NDArrayHandle, [value.handle for value in vals]), use_str_keys)
 
+def _ctype_key_value_PHUB(keys, vals):
+    """
+    Return ctype arrays for the key-value args, for internal use
+    """
+    assert(isinstance(keys,int));
+    if isinstance(vals, NDArray):
+        return (c_array(ctypes.c_int, [keys]),
+                c_array(NDArrayHandle, [vals.handle]))
+    else:
+        for value in vals:
+            #if not isinstance(value, NDArray):
+            #    print type(value), keys
+            assert(isinstance(value, NDArray))
+        #_ctypes.cast(<unsigned long long>self.chandle, _ctypes.c_void_p)
+        #ret = [ctypes.c_uint64(value.handle) for value in vals]
+        #ret = c_array(
+        #ret = np.array(ret)
+        #ret = ret.astype(np.float64);
+        #arr = nd.array(ret,None,np.uint64)
+
+        return (c_array(ctypes.c_int, [keys]),  c_array(NDArrayHandle, [value.handle for value in vals]))
+
+
 def _updater_wrapper(updater):
     """A wrapper for the user-defined handle."""
     def updater_handle(key, lhs_handle, rhs_handle, _):
@@ -135,6 +158,12 @@ def init(self, key, value):
             check_call(_LIB.MXKVStoreInitEx(self.handle, mx_uint(len(ckeys)), ckeys, cvals))
         else:
             check_call(_LIB.MXKVStoreInit(self.handle, mx_uint(len(ckeys)), ckeys, cvals))
+    
+    #PHuB initialization routine
+    def initPHUB(self, key, value):
+        ckeys, cvals = _ctype_key_value_PHUB(key,value)
+        check_call(_LIB.MXKVStoreInitPHUB(
+            self.handle, mx_uint(key), cvals, mx_uint(len(cvals))))
 
     def push(self, key, value, priority=0):
         """ Pushes a single or a sequence of key-value pairs into the store.
diff --git a/python/mxnet/model.py b/python/mxnet/model.py
index 2444ca0dc5..a143fb018d 100644
--- a/python/mxnet/model.py
+++ b/python/mxnet/model.py
@@ -38,6 +38,7 @@
 from .executor_manager import DataParallelExecutorManager, _check_arguments, _load_data
 from .io import DataDesc
 from .base import mx_real_t
+import os,math
 
 BASE_ESTIMATOR = object
 
@@ -93,14 +94,59 @@ def _create_kvstore(kvstore, num_device, arg_params):
 
     return (kv, update_on_kvstore)
 
-def _initialize_kvstore(kvstore, param_arrays, arg_params, param_names, update_on_kvstore):
-    """Initialize kvstore"""
+def _initialize_kvstore(kvstore, param_arrays, arg_params, param_names, update_on_kvstore, opt = None):
+    """ Initialize kvstore"""
+    """ Infiniband Servers, tell them to setup appropriate key sizes """
+    #aggKey = 2147483647
+    #rg = range(len(param_arrays))
+    #print arg_params[param_names[0]].size()
+    #imm = [int(arg_params[param_names[idx]].size) for idx in rg]
+    #for num in imm:
+    #    print num
+    #ndarray = nd.array(imm,None,np.int32)
+    #print "showing..."
+    #for num in ndarray:
+    #    print num,"new123"
+    #print "Showing key aggregation"
+    #kvstore.initPHUB(aggKey,ndarray)
+    #setup optimizer's PHUB specifics
+    translation = opt.PHUBVK2PK
+    bound = os.environ.get('MXNET_KVSTORE_BIGARRAY_BOUND')
+    numServer = os.environ.get('DMLC_NUM_SERVER')
+    vanType = os.environ.get('PSLITE_VAN_TYPE')
+    totalSize = 0
+    if bound == None:
+        bound = 512 * 1024
+    if bound != None: #and int(numServer) == 1:
+        #we need setup key chunking if necessary.
+        ssize = int(bound)
+        for idx, param_on_devs in enumerate(param_arrays):
+            chunk = int(math.ceil(1.0 * param_on_devs[0].size / ssize))
+            for i in range(0,chunk):
+                translation[len(translation)] = idx
+    flattened = [item for sublist in param_arrays for item in sublist]
+    kvstore.initPHUB(-1, flattened)
+    if vanType == None or vanType == 'zmq':
+        translation.clear() #zmq does not have our key chunking.
     for idx, param_on_devs in enumerate(param_arrays):
-        name = param_names[idx]
-        kvstore.init(name, arg_params[name])
-
+    #    print "initializing ",idx
+        kvstore.init(idx, arg_params[param_names[idx]])
+        #print arg_params[param_names[idx]].size
+        #raise "die"
         if update_on_kvstore:
-            kvstore.pull(name, param_on_devs, priority=-idx)
+            kvstore.pull(idx, param_on_devs, priority=-idx)
+            totalSize += param_on_devs[0].size * 4
+            # print "param_names = %s, idx = %s, size = %s" % (param_names[idx], idx, param_on_devs[0].size * 4)
+            #for param in param_on_devs:
+            #    assert(len(param_on_devs) == 1)
+            #    param.wait_to_read()
+            #wait to read, probably just one key there.
+    #time.sleep(5)        
+    #for now, comm_buf is used as send buffer, and we need to communicate address of pull buffers
+    #we also communicate the size implicitly
+    #-2 signals onkeypopulated.
+    # print "[info] total model size per worker  = %s. total number of params = %s" % (str(totalSize), len(arg_params))
+    kvstore.initPHUB(-2, flattened)
 
 def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names):
     """Perform update of param_arrays from grad_arrays on kvstore."""
@@ -110,9 +156,13 @@ def _update_params_on_kvstore(param_arrays, grad_arrays, kvstore, param_names):
             continue
         name = param_names[index]
         # push gradient, priority is negative index
-        kvstore.push(name, grad_list, priority=-index)
+        #kvstore.push(name, grad_list, priority=-index)
+        #please, use integer keys
+        kvstore.push(index, grad_list, priority=-index)
         # pull back the weights
-        kvstore.pull(name, arg_list, priority=-index)
+        #kvstore.pull(name, arg_list, priority=-index)
+        kvstore.pull(index, arg_list, priority=-index)
+
 
 def _update_params(param_arrays, grad_arrays, updater, num_device,
                    kvstore=None, param_names=None):
@@ -230,6 +280,7 @@ def _train_multi_device(symbol, ctx, arg_names, param_names, aux_names,
 
     executor_manager.set_params(arg_params, aux_params)
 
+    print('Your optimizer is ' + str(optimizer))
     if not update_on_kvstore:
         updater = get_updater(optimizer)
 
@@ -238,7 +289,7 @@ def _train_multi_device(symbol, ctx, arg_names, param_names, aux_names,
                             param_arrays=executor_manager.param_arrays,
                             arg_params=arg_params,
                             param_names=executor_manager.param_names,
-                            update_on_kvstore=update_on_kvstore)
+                            update_on_kvstore=update_on_kvstore, opt=optimizer)
 
     if update_on_kvstore:
         kvstore.set_optimizer(optimizer)
diff --git a/python/mxnet/module/module.py b/python/mxnet/module/module.py
index 4c20a6fed5..baa7933ba1 100644
--- a/python/mxnet/module/module.py
+++ b/python/mxnet/module/module.py
@@ -526,7 +526,7 @@ def init_optimizer(self, kvstore='local', optimizer='sgd',
                                 param_arrays=self._exec_group.param_arrays,
                                 arg_params=self._arg_params,
                                 param_names=self._param_names,
-                                update_on_kvstore=update_on_kvstore)
+                                update_on_kvstore=update_on_kvstore, opt=optimizer)
         if update_on_kvstore:
             kvstore.set_optimizer(self._optimizer)
         else:
@@ -627,6 +627,12 @@ def update(self):
             _update_params_on_kvstore(self._exec_group.param_arrays,
                                       self._exec_group.grad_arrays,
                                       self._kvstore, self._exec_group.param_names)
+            #_initialize_kvstore(kvstore=kvstore,
+            #                    param_arrays=self._exec_group.param_arrays,
+            #                    arg_params=self._arg_params,
+            #                    param_names=self._param_names,
+            #                    update_on_kvstore=update_on_kvstore, opt=optimizer)
+
         else:
             _update_params(self._exec_group.param_arrays,
                            self._exec_group.grad_arrays,
diff --git a/python/mxnet/optimizer.py b/python/mxnet/optimizer.py
index 66c261b880..7858364913 100644
--- a/python/mxnet/optimizer.py
+++ b/python/mxnet/optimizer.py
@@ -34,6 +34,27 @@
 
 
 class Optimizer(object):
+    def UpdateVanOptimizers(self):
+        #print "phubvk2pk:", str(self.PHUBVK2PK),"lr_mult:",str(self.lr_mult)
+        #assert len(self.PHUBVK2PK) > 0
+        #zmq may not have this.
+        if len(self.PHUBVK2PK) != 0:
+            keySize = len(self.PHUBVK2PK)
+            objCount = 2 * keySize + 2;
+            #individual learning rate and individual wd and momentum and gradscaling
+            #lr_mult
+            #wd_mult
+            vals = (ctypes.c_float * objCount)()
+            for i in range(0, keySize):
+                vals[i] = self._get_lr(self.PHUBVK2PK[i])
+                for i in range(0, keySize):
+                    vals[i + keySize] = self._get_wd(self.PHUBVK2PK[i])
+                    vals[2 * keySize] = self.momentum
+                    vals[2 * keySize + 1] = self.rescale_grad
+                    ptr = ctypes.cast(vals, ctypes.POINTER(ctypes.c_float))
+                    #print str(ptr)
+                    #raise
+                    _LIB.MXInitPHUBOptimizerParam(objCount, ptr)
     """The base class inherited by all optimizers.
 
     Parameters
@@ -102,6 +123,7 @@ def __init__(self, rescale_grad=1., param_idx2name=None, wd=0.,
             'param_idx2name should be a dict of param indexes to names.'
         self.idx2name = param_idx2name.copy()
         self.sym = sym
+        self.PHUBVK2PK = {}
         self.param_dict = param_dict if param_dict else {}
 
         self.set_lr_mult({})
@@ -598,6 +620,11 @@ def __init__(self, **kwargs):
     def update(self, index, weight, grad, state):
         assert(isinstance(weight, NDArray))
         assert(isinstance(grad, NDArray))
+        #return # for sanity checks
+        #print "warning error note info phubvk2pk= ", str(self.PHUBVK2PK)
+        if index in self.PHUBVK2PK:
+            index = self.PHUBVK2PK[index]
+
         self._update_count(index)
         lr = self._get_lr(index)
         wd = self._get_wd(index)
diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc
index 1d348a5b40..3bd3643866 100644
--- a/src/c_api/c_api.cc
+++ b/src/c_api/c_api.cc
@@ -106,19 +106,6 @@ int MXSetProfilerConfig(int mode, const char* filename) {
   API_END();
 }
 
-int MXDumpProfile() {
-  API_BEGIN();
-#if MXNET_USE_PROFILER
-  engine::Profiler *profiler = engine::Profiler::Get();
-  CHECK(profiler->IsEnableOutput())
-    << "Profiler haven't been run. Config and start profiler first";
-  engine::Profiler::Get()->DumpProfile();
-#else
-  LOG(FATAL) << "Need to compile with USE_PROFILER=1 for MXNet Profiler";
-#endif
-  API_END()
-}
-
 int MXSetProfilerState(int state) {
   // state, kNotRunning: 0, kRunning: 1
   API_BEGIN();
@@ -668,6 +655,16 @@ int MXDataIterCreateIter(DataIterCreator creator,
   API_END_HANDLE_ERROR(delete iter);
 }
 
+int MXInitPHUBOptimizerParam(mx_uint num_params, float* vals) 
+{ 
+  API_BEGIN(); 
+  if (ps::Postoffice::Get()->van()->VanType() == "pshub") 
+  { 
+      ps::Postoffice::Get()->van()->InitializeVanOptimizerValues(num_params, vals); 
+  } 
+  API_END(); 
+} 
+
 int MXDataIterFree(DataIterHandle handle) {
   API_BEGIN();
   delete static_cast<IIterator<DataBatch> *>(handle);
@@ -754,6 +751,24 @@ int MXKVStoreInit(KVStoreHandle handle,
   API_END();
 }
 
+MXNET_DLL int MXKVStoreInitPHUB(KVStoreHandle handle,
+                                const int key,
+                                NDArrayHandle* vals,
+                                mx_uint num)
+{
+    API_BEGIN();
+    std::vector<int> v_keys;
+    v_keys.push_back(key);
+    std::vector<NDArray> v_vals(num);
+    for(mx_uint i = 0; i < num; i++)
+    {
+        v_vals[i] = *static_cast<NDArray*>(vals[i]);
+    }
+    static_cast<KVStore*>(handle)->Init(v_keys, v_vals);
+    API_END();
+}
+
+
 int MXKVStoreInitEx(KVStoreHandle handle,
                   mx_uint num,
                   const char** keys,
@@ -885,6 +900,18 @@ void MXKVStoreSetUpdaterImpl(KVStoreHandle handle,
   static_cast<KVStore*>(handle)->set_updater(updt);
 }
 
+int MXDumpProfile() { 
+    API_BEGIN(); 
+#if MXNET_USE_PROFILER 
+    engine::Profiler *profiler = engine::Profiler::Get(); 
+    CHECK(profiler->IsEnableOutput()) 
+    << "Profiler haven't been run. Config and start profiler first"; 
+    engine::Profiler::Get()->DUMP(); 
+#else 
+    LOG(FATAL) << "Need to compile with USE_PROFILER=1 for MXNet Profiler"; 
+#endif 
+    API_END(); 
+} 
 int MXKVStoreSetUpdater(KVStoreHandle handle,
                         MXKVStoreUpdater updater,
                         void* updater_handle) {
diff --git a/src/engine/profiler.cc b/src/engine/profiler.cc
index 99504f61ce..a7ec2952ac 100644
--- a/src/engine/profiler.cc
+++ b/src/engine/profiler.cc
@@ -31,6 +31,8 @@
 #include <iostream>
 #include <fstream>
 #include "./profiler.h"
+#include <cstdlib> 
+//#include "ps/ps.h"
 
 #if defined(_MSC_VER) && _MSC_VER <= 1800
 #include <Windows.h>
@@ -151,6 +153,13 @@ void Profiler::EmitEvent(std::ostream *os, const std::string& name,
 
 void Profiler::DumpProfile() {
   SetState(kNotRunning);
+  
+  
+
+  std::stringstream fName;
+  fName << "PERFDIAG-" << getenv("DMLC_TRACKER_TOTAL_ID") << ".json";
+
+  filename_ = fName.str();//c_str();
 
   std::lock_guard<std::mutex> lock{this->m_};
   std::ofstream file;
@@ -175,7 +184,7 @@ void Profiler::DumpProfile() {
 
     for (uint32_t j = 0; j < opr_num; ++j) {
       const OprExecStat* opr_stat = d.opr_exec_stats[j];
-
+      if(opr_stat->WellFormed == false) continue;
       uint32_t pid = i;
       uint32_t tid = opr_stat->thread_id;
 
@@ -227,6 +236,7 @@ void SetOprEnd(OprExecStat* opr_stat) {
     LOG(WARNING) << "SetOpEnd: nullptr";
     return;
   }
+  opr_stat->WellFormed = true;
   opr_stat->opr_end_rel_micros   = NowInUsec() - Profiler::Get()->GetInitTime();
 }
 
diff --git a/src/engine/profiler.h b/src/engine/profiler.h
index b7f8e0e1f0..d7f5966a95 100644
--- a/src/engine/profiler.h
+++ b/src/engine/profiler.h
@@ -57,6 +57,7 @@ struct OprExecStat {
   uint32_t dev_type;
   /*! \brief device id */
   uint32_t dev_id;
+    bool WellFormed = false;
 };
 
 /*!
@@ -113,6 +114,14 @@ class Profiler {
   OprExecStat* AddOprStat(int dev_type, uint32_t dev_id);
   /*! \return Profiler singleton */
   static Profiler* Get();
+  void DUMP(std::string fileName = "")
+  {
+      if(fileName != "")
+      { 
+	  filename_ = fileName; //override 
+      } 
+      DumpProfile();
+  };
 
  protected:
   /*! \brief make constructor protected. */
@@ -146,7 +155,6 @@ class Profiler {
   /*! \brief the profiler init time */
   uint64_t init_time_;
 };
-
 /*! \return current clock time, time unit is microsecond (10^-6 s) */
 inline uint64_t NowInUsec();
 /*! \brief set operation execution start timestamp */
diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h
index deed1a15c9..724ae46785 100644
--- a/src/kvstore/comm.h
+++ b/src/kvstore/comm.h
@@ -91,7 +91,7 @@ class CommCPU : public Comm {
  public:
   CommCPU() {
     nthread_reduction_ = dmlc::GetEnv("MXNET_KVSTORE_REDUCTION_NTHREADS", 4);
-    bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 1000 * 1000);
+    bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 512 * 1024);
     // TODO(junwu) delete the following data member, now for benchmark only
     is_serial_push_ = dmlc::GetEnv("MXNET_KVSTORE_SERIAL_PUSH", 0);
   }
diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h
index 5e62be8c4c..b89f6ea0ba 100644
--- a/src/kvstore/kvstore_dist.h
+++ b/src/kvstore/kvstore_dist.h
@@ -31,6 +31,8 @@
 #include "mxnet/engine.h"
 #include "ps/ps.h"
 #include "./kvstore_dist_server.h"
+#include "../../ps-lite/src/infiniband_van.h"
+#include "../../ps-lite/src/PHubAllocator.h"
 #if MKL_EXPERIMENTAL == 1
 #include <mkl_memory.h>
 #include "../operator/mkl/mkl_memory-inl.h"
@@ -57,15 +59,18 @@ class KVStoreDist : public KVStoreLocal {
           ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler);
       }
     }
-    bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 1000 * 1000);
+    bigarray_bound_ = dmlc::GetEnv("MXNET_KVSTORE_BIGARRAY_BOUND", 512 * 1024);
+    CHECK(bigarray_bound_ % 1024 == 0);
+    pinned_ctx_ = Context::CPU();
     log_verbose_ = dmlc::GetEnv("MXNET_KVSTORE_DIST_ROW_SPARSE_VERBOSE", false);
+    printf("[note]Keys with element larger than %d will be splitted\n", bigarray_bound_);
   }
 
   virtual ~KVStoreDist() {
     Engine::Get()->WaitForAll();
     if (IsWorkerNode()) {
       if (barrier_before_exit_) {
-        Barrier();
+        Barrier("KVStore Finalizer");
         if (get_rank() == 0) {
           // stop the executor at servers
           SendCommandToServers(kStopServer, "");
@@ -85,7 +90,7 @@ class KVStoreDist : public KVStoreLocal {
     }
   }
 
-  void Barrier() override {
+  void Barrier(std::string name) {
     ps::Postoffice::Get()->Barrier(ps::kWorkerGroup);
   }
 
@@ -121,7 +126,26 @@ class KVStoreDist : public KVStoreLocal {
     if (!ps::Postoffice::Get()->is_recovery()) {
       ps::Postoffice::Get()->Barrier(
         ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler);
+    }//initialization call.
+
+    //this is the wait for all key size setup call
+    if (!ps::Postoffice::Get()->is_recovery()) {
+	ps::Postoffice::Get()->Barrier(
+	    ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler, "WorkerKeyPopulation");
     }
+
+    //now call to setup infiniband qps and etc.
+
+    if (IsServerNode())
+	LOG(INFO) << "[IMPORTANT BOOTSTRAP]Server about to call OnKeyPopulated!";
+
+    ps::Postoffice::Get()->van()->OnKeyPopulated();
+    if (!ps::Postoffice::Get()->is_recovery()) {
+	ps::Postoffice::Get()->Barrier(
+	    ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler, "OnKeyPopulated");
+    }
+
+    
     if (server_) server_->Run();
     ps::Finalize();
     if (server_) {
@@ -131,23 +155,332 @@ class KVStoreDist : public KVStoreLocal {
   }
 
  private:
+
+  //translate a physical key into a virtual key
+  std::vector<int> VirtualKeyTranslation;
+  //size of each virtual key
+  std::vector<int> VirtualKeySizes;
+  //translates a virtual key into a physical key.
+  std::vector<int> PhysicalKeyTranslation;
+  std::vector<std::vector<NDArray*>> PhysicalKeyPullAddress;
+
+  void PopulateVirtualKey(std::vector<int>& size)
+  {
+      int nextIndex = 0;
+      for (int i = 0; i < size.size(); i++)
+      {
+	  int pieces = 1;
+	  if (size[i] > bigarray_bound_)
+	  {
+	      pieces = (int)ceil(1.0 * size[i] / bigarray_bound_);
+	  }
+	  for (int p = 0; p < pieces; p++)
+	  {
+	      int sz = 0;
+	      if (pieces == 1)
+	      {
+		  sz = size[i];
+		  VirtualKeySizes.push_back(sz);
+		  //printf("key = %d, chunk = %d, size = %d\n", i, p, sz);
+	      }
+	      else if (p != pieces - 1)
+	      {
+		  //mid part. this is bigarray_bound
+		  VirtualKeySizes.push_back(bigarray_bound_);
+	      }
+	      else
+	      {
+		  //last piece. shall not be 0.
+		  //CHECK((size[i] % bigarray_bound_) != 0);
+		  VirtualKeySizes.push_back(size[i] - bigarray_bound_ * p);
+	      }
+	      PhysicalKeyTranslation.push_back(i);
+	      //vkey = VirtualKeySizes.size() - 1
+	      //pkey = i.
+	  }
+	  /*if(Environment::Get()->find("MXNET_PROFILER_EXTENDED_SUPPORT") != NULL)
+	        {
+		std::string kInfo = "V=";
+		kInfo += std::to_string(VirtualKeySizes.size() - 1);
+		kInfo += "P=";
+		kInfo += std::to_string(i);
+		std::string pushStr = "KVPush ";
+		pushStr += kInfo;
+		std::string pullStr = "KVPull ";
+		pullStr += kInfo;
+		extendedProfilerSupportPush[i] = pushStr;
+		extendedProfilerSupportPull[i] = pullStr;
+		}*/
+	      
+	  VirtualKeyTranslation.push_back(nextIndex);
+	  nextIndex += pieces;
+      }
+      //for(int i = 0; i < VirtualKeySizes.size();i++)
+      //    printf("virtual key =  %d size = %d\n",i,VirtualKeySizes[i] * sizeof(float));
+      //recalculate. 
+      CHECK(PhysicalKeyTranslation.size() == VirtualKeySizes.size());
+      CHECK(VirtualKeyTranslation.size() <= PhysicalKeyTranslation.size());
+  }
+
+  int RetrievePhysicalKeyFromVirtualKey(int vk)
+  {
+      CHECK(PhysicalKeyTranslation.size() > 0);
+      return PhysicalKeyTranslation[vk];
+  }
+
+  int RetrieveVirtualKeySizeFromVirtualKey(int kv)
+  {
+      CHECK(VirtualKeySizes.size() > 0);
+      return VirtualKeySizes[kv];
+  }
+
+  int RetrieveVirtualKeyFromPhysicalKey(int key, int part)
+  {
+      CHECK(VirtualKeyTranslation.size() > 0);
+      return VirtualKeyTranslation[key] + part;
+  }
+
+
   void InitImpl(const std::vector<int>& keys,
                 const std::vector<NDArray>& values) override {
     CheckUnique(keys);
-    for (size_t i = 0; i < keys.size(); ++i) {
-      comm_->Init(keys[i], values[i].storage_type(), values[i].shape(), values[i].dtype());
-    }
-    if (get_rank() == 0) {
-      Push_(keys, values, 0, false);
-      // wait until the push is finished
-      for (const int key : keys) {
-        comm_buf_[key].WaitToWrite();
-      }
-    } else {
-      // do nothing
+    if(ps::Postoffice::Get()->van()->HasFeature(ps::Van::SupportsKeyChunking) == false)
+    {
+	//ZMQ.
+	if(keys[0] < 0)
+	{
+	    if(ps::Postoffice::Get()->is_recovery() == false)
+	    {
+		ps::Postoffice::Get()->Barrier(
+		    ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler, keys[0] == -1 ? "WorkerKeyPopulation" : "OnKeyPopulated");
+		return;
+	    }
+	}
+	for (size_t i = 0; i < keys.size(); ++i) {
+	    comm_->Init(keys[i], values[i].storage_type(), values[i].shape(), values[i].dtype());
+	}
+	if (get_rank() == 0) {
+	    printf("Worker 0 is pushing key %d\n", keys[0]);
+	    Push_(keys, values, 0, false);
+	    // wait until the push is finished
+	    for (const auto& v : values) {
+		v.WaitToWrite();
+	    }
+	    // wait until the push is finished
+	    for (const int key : keys) {
+		comm_buf_[key].WaitToWrite();
+	    }
+	    printf("Worker 0 has finished pushing key %d\n", keys[0]);	    
+	} else {
+	    // do nothing
+	}
+	if (!ps::Postoffice::Get()->is_recovery()) {
+	    Barrier("Key Init " + std::to_string(keys[0]));
+	}
     }
-    if (!ps::Postoffice::Get()->is_recovery()) {
-      Barrier();
+    else
+    {
+	//LOG(INFO)<<"Worker Important Bootstrap K = "<<keys[0];
+	if (keys.size() != 1 || keys[0] >= 0)
+	{
+	    //copy whatever code it was originally.
+	    CheckUnique(keys);
+	    for (size_t i = 0; i < keys.size(); ++i) {
+		//if extended profiler is turned on, we need to assign profiler names.
+		//const char* opr_name 
+		comm_->Init(keys[i], values[i].storage_type(), values[i].shape(), values[i].dtype());
+		//first chunk the key if value is too large.
+		//int chunks = (int)ceil(1.0  * values[i].shape().Size() / bigarray_bound_);
+		//previously we registered the buffer, but we havent resize those yet.
+		//remember, buf is per physical key.
+		auto& buf = comm_buf_[keys[i]];
+		if (buf.is_none())
+		{
+		    //some keys may not be trainable, but may still be pushed.
+		    buf = NDArray(values[i].shape(), Context::CPU());
+		}
+		else
+		{
+		    //printf("key resizing key=%d, size=%d\n",keys[i],values[i].shape().Size());
+		    buf.ReshapeInternalExact(values[i].shape());
+		}
+		// printf("initializing key = %d\n",keys[0]);
+
+	    }
+	    if (get_rank() == 0) {
+		//PHUB auto chunk. No need to chunk for it.
+		Push_(keys, values, 0, false);
+		// wait until the push is finished
+		for (const auto& v : values) {
+		    v.WaitToWrite();
+		}
+		//the above is not sufficient because we copied value to comm_buf, which should in turn be waited for.
+		for (auto k : keys)
+		{
+		    CHECK(comm_buf_[k].is_none() == false);
+		    comm_buf_[k].WaitToWrite();
+		}
+
+		//in fact, should wait on 
+		//printf("rank 0 init %d ready\n", keys[0]);
+		LOG(INFO)<<"worker 0 setting up "<<keys.size()<<" keys";
+	    }
+	    else {
+		// do nothing
+	    }
+	    if (!ps::Postoffice::Get()->is_recovery()) {
+		Barrier("rank 0 initialization " + std::to_string(keys[0]));
+	    }
+	    //printf("rank %d finished key %d\n", get_rank(), keys[0]);
+	}
+	else if (keys[0] == -1)
+	{
+	    //this is the initialization code
+	    //everyone, setup your key counts!
+	    //first figure out how many keys are there.
+
+	    auto keyCounts = values.size();
+	    std::vector<int> physicalKeySizes;
+	    for (int i = 0; i < keyCounts; i++)
+	    {
+		physicalKeySizes.push_back(values[i].shape().Size());
+	    }
+	    PhysicalKeyPullAddress.resize(keyCounts);
+	    PopulateVirtualKey(physicalKeySizes);
+	    //first, populate PK to VK translation
+
+	    auto allocator = PHubAllocator::Get();
+	    CHECK(allocator->IsInitialized() == false);
+	    //make sure it's not initialized.
+	    //now create a map that allocator uses.
+	    //note that we need VIRTUAL keys here.
+	    std::unordered_map<int, int> tempVKBuf;
+	    for (size_t i = 0; i < VirtualKeySizes.size(); i++)
+	    {
+		tempVKBuf[i] = VirtualKeySizes[i] * sizeof(float); //allocator speaks bytes, not element counts
+	    }
+
+	    //initialize verbs if possible
+	    //initialize my verbs.
+	    int socketCnt = 1;
+	    if (ps::Postoffice::Get()->van()->HasFeature(Van::NativeInfiniband))
+	    {
+		auto pVan = (InfiniBandVan*)ps::Postoffice::Get()->van();
+		int qpCnt = pVan->QPCountOverride > 0 ? pVan->QPCountOverride : (int)tempVKBuf.size();
+		//don't have a prefered interface.
+		//see if DirectConnect is specified.
+		auto dConn = Environment::Get()->find("IB_DIRECT_CONNECT") != NULL;
+		pVan->verbs = new Verbs(qpCnt, pVan->GetUnderlyingWorkerThreadCount(),
+					(int)ps::Postoffice::Get()->van()->my_node().id, tempVKBuf, dConn, "");
+		socketCnt = pVan->verbs->SocketCount;
+	    }
+	    allocator->Init(tempVKBuf, false, 1, sizeof(ps::MetaSlim), socketCnt, VirtualKeySizes.size() == physicalKeySizes.size());
+	    //We have not registered buffer yet.
+	    ps_worker_->PHUBDeterministicCallbacks.resize(physicalKeySizes.size(), nullptr);
+	    ps_worker_->PHUBDeterministicChunkCounters.resize(physicalKeySizes.size(), 0);
+	    ps_worker_->PHUBVirtualToPhysicalKeyMapping = PhysicalKeyTranslation;
+
+	    if (get_rank() == 0)
+	    {
+		//printf("[warning] PhysicalKey=%d, VirtualKey=%d. pushed to server\n", physicalKeySizes.size(), VirtualKeySizes.size());
+		//bypass engine.
+		CHECK(sizeof(float) == sizeof(int));
+		auto sKeys = ps::SArray<ps::Key>(1, -1);
+		auto sVSizes = ps::SArray<real_t>((real_t*)VirtualKeySizes.data(), VirtualKeySizes.size(), false);
+		auto sLen = ps::SArray<int>(1, VirtualKeySizes.size());
+		//ZPush is used because comm_ isnt initialized.
+		ps_worker_->Wait((ps_worker_)->ZPush(sKeys, sVSizes, sLen));
+	    }
+
+	    //initialize comm buffers.
+	    for (int i = 0; i < keyCounts; i++)
+	    {
+		auto& buf = comm_buf_[i];
+		//defensive
+		CHECK(buf.is_none());
+		if (buf.is_none()) {
+		    //??? what would be the shape?
+		    //we currently do not know. just create the same size, the resize it
+		    //when we know its exact size later.
+		    //Here, instead of using the default allocator, we must use PHUBAllocator if we want 0 copy.
+		    //we can learn how to create a NDArray from an existing buffer from kvstore_dist_server.h
+		    size_t len;
+		    //make sure the allocated buffer is at least as large.
+		    auto vk = RetrieveVirtualKeyFromPhysicalKey(i, 0);
+		    auto socketIdx = 0;
+		    if (ps::Postoffice::Get()->van()->HasFeature(Van::NativeInfiniband))
+		    {
+			auto verbs = ((InfiniBandVan*)ps::Postoffice::Get()->van())->verbs;
+			socketIdx = verbs->Helper_Worker_GetEndpointFromKey(i).SocketIdx;
+
+		    }
+		    auto workerBuf = allocator->WorkerKVBuffer(vk, socketIdx, len);
+		    //CHECK(len >= cntDbg);
+		    //the buffer len allocated should be larger than key (larger for padding reasons).
+		    //also the size of PHSYICAL key is used because all virtual keys are contiguous in Worker.
+		    TBlob recv_blob((real_t*)workerBuf, // NOLINT(*)
+                                    values[i].shape(), cpu::kDevMask);
+		    buf = NDArray(recv_blob, 0);
+
+		    //buf = NDArray(values[i].shape(), Context::CPU());
+		}
+		CHECK(buf.is_none() == false);
+
+	    }
+
+	    //now i know there are many keys.
+	    for (int i = 0; i < keyCounts; i++)
+	    {
+		auto& buf = comm_buf_[i];
+		CHECK(buf.is_none() == false);
+		CHECK(buf.shape().Size() == values[i].shape().Size());
+		//i need to somehow communicate with IBVerbs.
+		int chunks = (int)ceil(1.0 * physicalKeySizes[i] / bigarray_bound_);
+		//you only need to register once and prefer registering with larger chunks.
+		real_t* currentBufferStart = (real_t*)buf.data().dptr_;
+		real_t* currentRecvBufferStart = (real_t*)values[i].data().dptr_;
+		auto remaining = physicalKeySizes[i];
+		for (int c = 0; c < chunks; c++)
+		{
+		    int curr = remaining >= bigarray_bound_ ? bigarray_bound_ : remaining;
+		    if (chunks == 1)
+			curr = physicalKeySizes[i];
+		    auto vk = RetrieveVirtualKeyFromPhysicalKey(i, c);
+		    //printf("[Worker0]VK = %d has size %d, read location = %p, write location = %p\n", vk, curr, currentBufferStart, currentRecvBufferStart);
+		    //printf("Worker registration self key=%d\n",vk);
+		    CHECK(currentBufferStart != NULL);
+		    //Note these two buffers are identical, because we're using comm_buf to synchronzie.
+		    //This violates the previous assumption that we can deliver buffer directly without a copy, but since there is GPU a copy is forced.
+		    ps::Postoffice::Get()->van()->SetKeySize(vk, sizeof(real_t) * curr, (uint64_t)currentBufferStart, (uint64_t)currentBufferStart);
+		    CHECK(curr == RetrieveVirtualKeySizeFromVirtualKey(vk)) << " vk=" << vk << " " << curr << " vs " << RetrieveVirtualKeySizeFromVirtualKey(vk);
+		    //... redundant calculations. But initialize code only run once.
+		    remaining -= curr;
+		    currentBufferStart += curr;
+		    currentRecvBufferStart += curr;
+		}
+		CHECK(remaining == 0);
+		//assuming there's only one server.
+		//CHECK_EQ(ps::Postoffice::Get()->num_servers(), 1);
+		//printf("[%d]Worker local key population\n",ps::Postoffice::Get()->van()->my_node().id);
+	    }
+	    //so hacky. but this is to signal OnKeyPopulated need to be called.
+	    if (!ps::Postoffice::Get()->is_recovery()) {
+		ps::Postoffice::Get()->Barrier(
+		    ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler, "WorkerKeyPopulation");
+	    }
+	}
+	else if (keys[0] == -2)
+	{
+	    //everyone, now initiate phase 1 initialization of infiniband.
+	    ps::Postoffice::Get()->van()->OnKeyPopulated();
+	    if (!ps::Postoffice::Get()->is_recovery()) {
+		ps::Postoffice::Get()->Barrier(ps::kWorkerGroup + ps::kServerGroup + ps::kScheduler, "OnKeyPopulated");
+	    }
+	}
+	else
+	{
+	    CHECK(false);
+	}
     }
   }
 
@@ -177,20 +510,52 @@ class KVStoreDist : public KVStoreLocal {
         recv_buf = NDArray(grouped_vals[i][0]->shape(), pinned_ctx_,
                            true, grouped_vals[i][0]->dtype());
       }
-      auto pull_from_servers = [this, key, recv_buf](
+      real_t* data = static_cast<real_t*>(recv_buf.data().dptr_);
+      size_t size = recv_buf.shape().Size();
+      
+      if (ps::Postoffice::Get()->van()->HasFeature(ps::Van::PullRequestElision) == true)
+      {
+	  //This pull is PHYSICAL key.
+	  CHECK(uniq_keys.size() == 1);
+	  if (PhysicalKeyPullAddress.at(key).size() == 0)
+	  {
+	      //make sure this van is not fully initialized. otherwise logic issue
+	      CHECK(ps::Postoffice::Get()->van()->FullyInitialized() == false);
+	      for (auto addr : values)
+	      {
+		  PhysicalKeyPullAddress.at(key).push_back(addr);
+	      }
+	  }
+	  else
+	  {
+	      //safely continue.
+	      //another way to do this is to still issue a pull if mismatch.
+	      //but i dont think the address is really changing.
+	      //CHECK(values.size() == 1);
+	      CHECK(values.size() == PhysicalKeyPullAddress.at(key).size());
+	      for (size_t i = 0; i < values.size(); i++)
+	      {
+		  CHECK(values[i] == PhysicalKeyPullAddress.at(key).at(i));
+	      }
+	      //printf("key = %d elision success. \n", key);
+	      continue;
+	  }
+	  //CHECK that elision logic is correct.
+      }
+      auto pull_from_servers = [this, key, data,size](
           RunContext rctx, Engine::CallbackOnComplete cb) {
         // convert to ps keys
-        size_t size = recv_buf.shape().Size();
+	  //size_t size = recv_buf.shape().Size();
         PSKV& pskv = EncodeKey(key, size);
 #if MKL_EXPERIMENTAL == 1
         mkl_set_tblob_eager_mode(recv_buf.data());
 #endif
-        real_t* data = recv_buf.data().dptr<real_t>();
+        //real_t* data = recv_buf.data().dptr<real_t>();
         // false means not to delete data when SArray is deleted
         auto vals = new ps::SArray<real_t>(data, size, false);
         // issue pull
         CHECK_NOTNULL(ps_worker_)->ZPull(
-          pskv.keys, vals, &pskv.lens, kDefaultPushPull, [vals, cb](){ delete vals; cb(); });
+	    pskv.keys, vals, &pskv.lens, kDefaultPushPull, [pskv, vals, cb](){ delete vals; cb(); });
       };
 
       CHECK_NOTNULL(Engine::Get())->PushAsync(
@@ -262,17 +627,32 @@ class KVStoreDist : public KVStoreLocal {
     for (size_t i = 0; i < uniq_keys.size(); ++i) {
       // merge over devcies
       int key = uniq_keys[i];
-      const auto& vals = grouped_vals[i];
+      auto& vals = grouped_vals[i];
       NDArray merged = do_merge ? comm_->Reduce(key, vals, priority) : vals[0];
+      /*if(keys[0] == 0)
+      {
+	  printf("worker side. value = %s\n", ((NDArray*)&values[0])->Summarize().c_str());
+	  printf("worker side. vals[0] = %s\n", vals[0].Summarize().c_str());
+	  }*/
 
       auto& send_buf = comm_buf_[key];
       const auto storage_type = merged.storage_type();
       if (merged.ctx().dev_mask() == cpu::kDevMask) {
-        // Start of a push doesn't guarantee that the previous pushes are completed.
-        // This shouldn't affect training of networks though because training involves
-        // a sequence of push, pull, then push. This imposes ordering that the
-        // second push happens after the first pull, and the pull happens after first push.
-        send_buf = merged;  // avoid memory copy
+	  //don't use this shortcut.
+	  if (ps::Postoffice::Get()->van()->HasFeature(ps::Van::WorkerSidePushPullZeroCopy))
+	  {
+	      CHECK(send_buf.is_none() == false);
+	  }
+	  else if(send_buf.is_none())
+	  {
+	      send_buf = NDArray(merged.shape(), pinned_ctx_, true, merged.dtype());
+	  }
+	  //CopyFromTo(merged, &send_buf);
+	  // Start of a push doesn't guarantee that the previous pushes are completed.
+	  // This shouldn't affect training of networks though because training involves
+	  // a sequence of push, pull, then push. This imposes ordering that the
+	  // second push happens after the first pull, and the pull happens after first push.
+	  //send_buf = merged;  // avoid memory copy, PHUB: disable this shortcut.
       } else {
         if (send_buf.is_none()) {
           if (storage_type == kDefaultStorage) {
@@ -281,38 +661,78 @@ class KVStoreDist : public KVStoreLocal {
             send_buf = NDArray(storage_type, merged.shape(), pinned_ctx_, true, merged.dtype());
           }
         }
-        CopyFromTo(merged, &send_buf);
+        //CopyFromTo(merged, &send_buf);
       }
-
+      CopyFromTo(merged, &send_buf);
+      //if(key == 0)
+      //{
+//	  printf("worker side. vals[0] = %s, vals_cnt = %d\n", vals[0].Summarize().c_str(), vals.size());
+//	  printf("worker side. send_buf = %s\n", send_buf.Summarize().c_str());
+      //    }
+      //now, each key has only one corresponding merge buffer.
       // push to servers
-      if (storage_type == kDefaultStorage) {
-      auto push_to_servers =
-          [this, key, send_buf](RunContext rctx, Engine::CallbackOnComplete cb) {
-          // convert to ps keys
-          size_t size = send_buf.shape().Size();
-          PSKV& pskv = EncodeKey(key, size);
-
+      size_t size = send_buf.shape().Size();
+      real_t* data = static_cast<real_t*>(send_buf.data().dptr_);
+      //printf("worker ZPushing determining storage type %d\n", kDefaultStorage);
+	      
+      if (storage_type == kDefaultStorage) 
+      {
+	  //printf("worker ZPushing Key %d\n", key);
+
+	  auto push_to_servers =
+	      [this, key, data, size](RunContext rctx, Engine::CallbackOnComplete cb) {
+	      // convert to ps keys
+	      //size_t size = send_buf.shape().Size();
+	      PSKV& pskv = EncodeKey(key, size);
+	      
 #if MKL_EXPERIMENTAL == 1
-          mkl_set_tblob_eager_mode(send_buf.data());
+	      mkl_set_tblob_eager_mode(send_buf.data());
 #endif
-          real_t* data = send_buf.data().dptr<real_t>();
-          // do push. false means no delete
-          ps::SArray<real_t> vals(data, size, false);
-          CHECK_NOTNULL(ps_worker_)->ZPush(
-              pskv.keys, vals, pskv.lens, 0, [cb]() { cb(); });
-        };
-        Engine::Get()->PushAsync(
-            push_to_servers,
-            pinned_ctx_,
-            {send_buf.var()},
-            {},
-            FnProperty::kNormal,
-            priority,
-            PROFILER_MESSAGE("KVStoreDistDefaultPush"));
-      } else if (storage_type == kRowSparseStorage) {
-        PushRowSparse(key, send_buf, priority);
-      } else {
-        LOG(FATAL) << "unknown storage type";
+	      //real_t* data = send_buf.data().dptr<real_t>();
+	      // do push. false means no delete
+	      ps::SArray<real_t> _vals(data, size, false);
+	      //pskv contains potentially many keys.
+	      //all keys put their stuff in the SArray(vals).
+	      //the locations are clearly marked in pskv.lens.
+	      CHECK_NOTNULL(ps_worker_)->ZPush(
+		  pskv.keys, _vals, pskv.lens, 0, [cb]() { cb(); });
+	  };
+	  if (ps::Postoffice::Get()->van()->HasFeature(ps::Van::PullRequestElision) == true &&
+	      ps::Postoffice::Get()->van()->FullyInitialized() == true)
+	  {
+	      Engine::Get()->PushAsync(
+		  push_to_servers,
+		  pinned_ctx_,
+		  {},
+		  {send_buf.var()},
+		  FnProperty::kNormal,
+		  priority,
+		  PROFILER_MESSAGE("KVStoreDistDefaultPush"));
+	      CHECK(PhysicalKeyPullAddress.at(key).size() != 0);
+	      //queue another broadcast. this won't happen until push is done, and by which time 
+	      //with pull request elision enabled vans send_buf will be populated.
+	      comm_->Broadcast(key, send_buf, PhysicalKeyPullAddress.at(key), priority);
+	  }
+	  else
+	  {
+	      Engine::Get()->PushAsync(
+		  push_to_servers,
+		  pinned_ctx_,
+		  { send_buf.var() },
+		  {},
+		  FnProperty::kNormal,
+		  priority,
+		  PROFILER_MESSAGE("KVStoreDistDefaultPush"));
+	      //PUll elision disabled means it can be shared for read.
+	  }
+      }
+      else if (storage_type == kRowSparseStorage) 
+      {
+	  PushRowSparse(key, send_buf, priority);
+      }
+      else
+      {
+	  LOG(FATAL) << "unknown storage type";
       }
     }
   }
@@ -433,35 +853,67 @@ class KVStoreDist : public KVStoreLocal {
     mu_.unlock();
 
     if (!pskv.keys.empty()) {
-      CHECK_EQ(static_cast<size_t>(pskv.size), size) << "The value size cannot be changed";
+	CHECK_EQ(static_cast<size_t>(pskv.size), size) << "The value size cannot be changed key = " << key
+						       << " currval = "  << pskv.size 
+						       << " expected = " << size;
     } else {
       auto krs = ps::Postoffice::Get()->GetServerKeyRanges();
       int num_servers = krs.size();
       CHECK_GT(num_servers, 0);
 
       // a simple heuristic for load balance
-      if (size < bigarray_bound_) {
-        // send it to a single random picked server
-        int server = (key * 9973) % num_servers;
-        ps::Key ps_key = krs[server].begin() + key;
-        CHECK_LT(ps_key, krs[server].end());
-        pskv.keys.push_back(ps_key);
-        pskv.lens.push_back(size);
-        pskv.size = size;
-      } else {
-        // parition it to all servers
-        pskv.size = 0;
-        for (int i = 0; i < num_servers; ++i) {
-          size_t part_size =
-              static_cast<size_t>(round(static_cast<double>(size)/num_servers*(i+1))) -
-              static_cast<size_t>(round(static_cast<double>(size)/num_servers*i));
-          ps::Key ps_key = krs[i].begin() + key;
-          CHECK_LT(ps_key, krs[i].end());
-          pskv.keys.push_back(ps_key);
-          pskv.lens.push_back(part_size);
-          pskv.size += part_size;
-        }
-        CHECK_EQ(static_cast<size_t>(pskv.size), size);
+      if (size < bigarray_bound_)
+      {
+	  if (ps::Postoffice::Get()->van()->HasFeature(ps::Van::SupportsKeyChunking))
+	  {
+	      pskv.keys.push_back(RetrieveVirtualKeyFromPhysicalKey(key, 0));
+	  }
+	  else
+	  {
+	      // send it to a single random picked server
+	      int server = (key * 9973) % num_servers;
+	      ps::Key ps_key = krs[server].begin() + key;
+	      CHECK_LT(ps_key, krs[server].end());
+	      pskv.keys.push_back(ps_key);
+	  }
+	  pskv.lens.push_back(size);
+	  pskv.size = size;
+      } 
+      else
+      {
+	  if (ps::Postoffice::Get()->van()->HasFeature(ps::Van::SupportsKeyChunking))
+	  {
+	      pskv.size = 0;
+	      int chunks = (int)ceil(1.0 * size / bigarray_bound_);
+	      int remaining = size;
+	      for (int i = 0; i < chunks; i++)
+	      {
+		  ps::Key ps_key = RetrieveVirtualKeyFromPhysicalKey(key, i);
+		  pskv.keys.push_back(ps_key);
+		  int curr = remaining >= bigarray_bound_ ? bigarray_bound_ : remaining;
+		  pskv.lens.push_back(curr);
+		  remaining -= curr;
+		  pskv.size = size;//not sure whether we need, but carry information around.
+	      }
+	      CHECK(remaining == 0);
+	  }
+	  else
+	  {
+	      // parition it to all servers
+	      pskv.size = 0;
+	      for (int i = 0; i < num_servers; ++i) 
+	      {
+		  size_t part_size =
+		      static_cast<size_t>(round(static_cast<double>(size)/num_servers*(i+1))) -
+		      static_cast<size_t>(round(static_cast<double>(size)/num_servers*i));
+		  ps::Key ps_key = krs[i].begin() + key;
+		  CHECK_LT(ps_key, krs[i].end());
+		  pskv.keys.push_back(ps_key);
+		  pskv.lens.push_back(part_size);
+		  pskv.size += part_size;
+	      }
+	  }
+	  CHECK_EQ(static_cast<size_t>(pskv.size), size);
       }
     }
     return pskv;
diff --git a/src/kvstore/kvstore_dist_server.h b/src/kvstore/kvstore_dist_server.h
index bedb5398a0..2662a691df 100644
--- a/src/kvstore/kvstore_dist_server.h
+++ b/src/kvstore/kvstore_dist_server.h
@@ -35,6 +35,8 @@
 #include "mxnet/kvstore.h"
 #include "../operator/tensor/elemwise_binary_op-inl.h"
 #include "../operator/tensor/init_op.h"
+#include "../../ps-lite/include/dmlc/DIME.h"
+
 
 namespace mxnet {
 namespace kvstore {
@@ -117,6 +119,12 @@ class KVStoreDistServer {
         std::bind(&KVStoreDistServer::DataHandleEx, this, _1, _2, _3));
     sync_mode_ = false;
     log_verbose_ = dmlc::GetEnv("MXNET_KVSTORE_DIST_ROW_SPARSE_VERBOSE", false);
+    auto suppress = dmlc::GetEnv("PHUB_SUPPRESS_AGGREGATOR", 0);
+    if (suppress == 1)
+    {
+	SuppressAggregator = true;
+    }
+    LOG(INFO)<<"PHUB SUPPRESS AGGREGATOR toggled?" << suppress;
   }
 
   ~KVStoreDistServer() {
@@ -129,8 +137,15 @@ class KVStoreDistServer {
   }
 
   void set_updater(const KVStore::Updater& updater)  {
-    CHECK(updater);
-    updater_ = updater;
+      auto suppress = dmlc::GetEnv("PHUB_SUPPRESS_OPTIMIZER", 0);
+      LOG(INFO) << "PHUB SUPPRESS OPTIMIZER toggled?" << suppress;
+      if (suppress == 1)
+      {
+	  updater_ = NULL;
+	  return; //dont optimize
+      }
+      CHECK(updater);
+      updater_ = updater;
   }
 
   /**
@@ -139,7 +154,21 @@ class KVStoreDistServer {
   void Run() {
     exec_.Start();
   }
+  void set_synch_mode(bool async)
+  {
+      auto asyncMode = dmlc::GetEnv("PHUB_ASYNC_MODE", 0);
+      if (asyncMode == 1)
+      {
+	  sync_mode_ = false;
+      }
+      else
+      {
+	  sync_mode_ = !async;
+      }
+      printf("PHUB ASYNC MODE toggled = %d\n", (sync_mode_ == false));
+      sync_mode_ = !async;
 
+  }
  private:
   struct MergeBuf {
     std::vector<ps::KVMeta> request;
@@ -150,7 +179,9 @@ class KVStoreDistServer {
     if (recved.head == kStopServer) {
       exec_.Stop();
     } else if (recved.head == kSyncMode) {
-      sync_mode_ = true;
+	sync_mode_ = true;
+	set_synch_mode(false);
+	
     } else {
       // let the main thread to execute ctrl, which is necessary for python
       exec_.Exec([this, recved]() {
@@ -174,27 +205,36 @@ class KVStoreDistServer {
 
   inline void ApplyUpdates(const int key, MergeBuf *merged, NDArray *stored,
                            ps::KVServer<real_t>* server) {
-    if (merged->request.size() == (size_t) ps::NumWorkers()) {
-      // let the main thread to execute updater_, which is necessary for python
-      if (updater_) {
-        exec_.Exec([this, key, merged, stored](){
-            CHECK(updater_);
-            updater_(key, merged->array, stored);
-          });
-      } else {
-        // if no updater, just copy
-        CopyFromTo(merged->array, stored);
-      }
-      if (log_verbose_)  {
-        LOG(INFO) << "sync response to " << merged->request.size() << " workers";
-      }
-      for (const auto& req : merged->request) {
-        server->Response(req);
-      }
-      merged->request.clear();
-      stored->WaitToRead();
-    } else {
-      merged->array.WaitToRead();
+    if (merged->request.size() == (size_t) ps::NumWorkers())
+    {
+	// let the main thread to execute updater_, which is necessary for python
+	if (updater_) 
+	{
+	    exec_.Exec([this, key, merged, stored](){
+		    CHECK(updater_);
+		    updater_(key, merged->array, stored);
+		});
+	} 
+	else 
+	{
+	    // if no updater, just copy
+	    CopyFromTo(merged->array, stored);
+	}
+	if (log_verbose_)
+	{
+	    LOG(INFO) << "sync response to " << merged->request.size() << " workers";
+	}
+	for (const auto& req : merged->request)
+	{
+	    req.additionalPayload - key;
+	    server->Response(req);
+	}
+	merged->request.clear();
+	stored->WaitToRead();
+    } 
+    else 
+    {
+	merged->array.WaitToRead();
     }
   }
 
@@ -371,54 +411,81 @@ class KVStoreDistServer {
 
     int key = DecodeKey(req_data.keys[0]);
     auto& stored = store_[key];
-
+    
+    //if(key == 0)
+    //{
+    //printf("key 0 before is %s\n", stored.Summarize().c_str());
+    //}
     // there used several WaitToRead, this is because \a recved's memory
     // could be deallocated when this function returns. so we need to make sure
     // the operators with \a NDArray are actually finished
-    if (req_meta.push) {
-      size_t ds[] = {(size_t)req_data.lens[0]};
-      TShape dshape(ds, ds + 1);
-      TBlob recv_blob((real_t*)req_data.vals.data(), // NOLINT(*)
-                      dshape, cpu::kDevMask);
-      NDArray recved = NDArray(recv_blob, 0);
-      if (stored.is_none()) {
-        // initialization
-        stored = NDArray(dshape, Context());
-        CopyFromTo(recved, &stored, 0);
-        server->Response(req_meta);
-        stored.WaitToRead();
-      } else if (sync_mode_) {
-        // synced push
-        auto& merged = merge_buf_[key];
-        if (merged.array.is_none()) {
-          merged.array = NDArray(dshape, Context());
-        }
-        if (merged.request.size() == 0) {
-          CopyFromTo(recved, &merged.array, 0);
-        } else {
-          merged.array += recved;
-        }
-        merged.request.push_back(req_meta);
-        ApplyUpdates(key, &merged, &stored, server);
-      } else {
-        // async push
-        exec_.Exec([this, key, &recved, &stored](){
-            CHECK(updater_);
-            updater_(key, recved, &stored);
-          });
-        server->Response(req_meta);
-        stored.WaitToRead();
+    if (req_meta.push) 
+    {
+	size_t ds[] = {(size_t)req_data.lens[0]};
+	TShape dshape(ds, ds + 1);
+	TBlob recv_blob((real_t*)req_data.vals.data(), // NOLINT(*)
+			dshape, cpu::kDevMask);
+	NDArray recved = NDArray(recv_blob, 0);
+	//if(key == 0)
+	//{
+	//    printf("update vector is %s\n", recved.Summarize().c_str());
+	//}
+	if (stored.is_none())
+	{
+	    // initialization
+	  stored = NDArray(dshape, Context());
+	  CopyFromTo(recved, &stored, 0);
+	  server->Response(req_meta);
+	  stored.WaitToRead();
+	} 
+	else if (sync_mode_) 
+	{
+	    // synced push
+	    auto& merged = merge_buf_[key];
+	    if (merged.array.is_none()) 
+	    {
+		merged.array = NDArray(dshape, Context());
+	    }
+	    if(SuppressAggregator == false)
+	    {
+		if (merged.request.size() == 0) 
+		{
+		    CopyFromTo(recved, &merged.array, 0);
+		} else
+		{
+		    merged.array += recved;
+		}
+	    }
+	    merged.request.push_back(req_meta);
+	    ApplyUpdates(key, &merged, &stored, server);
+	    // if(key == 0)
+	    //{
+//		printf("end of update merge %s\n", merged.array.Summarize().c_str());
+//		printf("end of update stored %s\n", stored.Summarize().c_str());
+//	    }
+	} 
+	else
+	{
+	    // async push
+	    exec_.Exec([this, key, &recved, &stored](){
+		    CHECK(updater_);
+		    updater_(key, recved, &stored);
+		});
+	    server->Response(req_meta);
+	    stored.WaitToRead();
       }
-    } else {
-      // pull
-      ps::KVPairs<real_t> response;
-      CHECK(!stored.is_none()) << "init " << key << " first";
-      auto len = stored.shape().Size();
-      response.keys = req_data.keys;
-      response.lens = {len};
-      // TODO(mli) try to remove this CopyFrom
-      response.vals.CopyFrom(static_cast<const float*>(stored.data().dptr_), len);
-      server->Response(req_meta, response);
+    } 
+    else
+    {
+	// pull
+	ps::KVPairs<real_t> response;
+	CHECK(!stored.is_none()) << "init " << key << " first";
+	auto len = stored.shape().Size();
+	response.keys = req_data.keys;
+	response.lens = {len};
+	// TODO(mli) try to remove this CopyFrom
+	response.vals.CopyFrom(static_cast<const float*>(stored.data().dptr_), len);
+	server->Response(req_meta, response);
     }
   }
 
@@ -433,7 +500,7 @@ class KVStoreDistServer {
   bool sync_mode_;
   KVStore::Controller controller_;
   KVStore::Updater updater_;
-
+  bool SuppressAggregator;
   std::unordered_map<int, NDArray> store_;
   std::unordered_map<int, MergeBuf> merge_buf_;
 
diff --git a/src/kvstore/kvstore_local.h b/src/kvstore/kvstore_local.h
index 15a4c6055b..957d4c6db2 100644
--- a/src/kvstore/kvstore_local.h
+++ b/src/kvstore/kvstore_local.h
@@ -143,6 +143,10 @@ class KVStoreLocal : public KVStore {
           << "duplicate init of key " << keys[i];
       local_[keys[i]] = values[i].Copy(pinned_ctx_);
       comm_->Init(keys[i], values[i].storage_type(), values[i].shape(), values[i].dtype());
+      if(keys[0] == 0)
+      {
+	  printf("init = %s\n", ((NDArray*)&values[0])->Summarize().c_str());
+      }
     }
   }
 
@@ -155,6 +159,11 @@ class KVStoreLocal : public KVStore {
     for (size_t i = 0; i < uniq_keys.size(); ++i) {
       int key = uniq_keys[i];
       const NDArray& merged = comm_->Reduce(key, grouped_vals[i], priority);
+      if(keys[0] == 0)
+      {
+	  printf("grouped_vals[i][0] = %s\n", grouped_vals[i][0].Summarize().c_str());
+      }
+      
       NDArray& local = local_[key];
       if (updater_ != nullptr) {
         CHECK(!local.is_none()) << "key " << key << " has not been inited";


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services