You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jun Yang <ya...@gmail.com> on 2014/11/17 07:18:19 UTC

Questions Regarding to MPI Program Migration to Spark

Guys,

Recently we are migrating our backend pipeline from to Spark.

In our pipeline, we have a MPI-based HAC implementation, to ensure the
result consistency of migration, we also want to migrate this
MPI-implemented code to Spark.

However, during the migration process, I found that there are some possible
limitation with Spark.

In the original MPI implementation, the logic looks like the following:

Node 0( master node )

     Get the complete document data, store in g_doc_data
     Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
     while ( exit condition is not met ) {
        Find the locally closed node pair, notated as l_closest_pair
        Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
        Merge the globally closed node pair and update the document data
g_doc_data.
        Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
      }
Node 1/2/.../P ( slave nodes )
     Get the complete document data, store in g_doc_data
     Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
     while ( exit condition is not met ) {
        Find the locally closed node pair, notated as l_closest_pair
        Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
        Merge the globally closed node pair and update the document data
g_doc_data.
        Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
      }

The essential difficulty for migrating the above logic to Spark is:
    In the original implementation, between each iteration, the computation
nodes need to hold the local state( which is g_doc_data and
l_dist_metric_data ).
    And in Spark, it looks that there isn't any effective ways for keeping
intermediate local state between iterations. Usually in Spark, we use
either broadcast variable or closure to pass state to the operations of
each iterations.

Of course, after each iteration, we could summarize the change effects from
all the worker nodes via reduce and then broadcast this summarization
effect to them back again. But this operation will involve a significant
data transfer, when the data size is large ( e.g. 100 thousands documents
with 500 dimension feature vectors ),  and the performance penalty is
non-neglectable.

So my question is:
1. Is the difficulty I mentioned above is the limitations imposed by the
computation paradigm of Spark?
2. Is there any possible ways for implementing the bottom-up agglomeration
hierarchical clustering algorithms in Spark?

BTW, I know there are some top-down divisive hierarchical clustering
algorithm in the upcoming 1.2 release, I will also give them a try.

Thanks.
-- 
yangjunpro@gmail.com
http://hi.baidu.com/yjpro