You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2019/01/18 19:40:20 UTC

[GitHub] alexkli commented on a change in pull request #16: Add parallel, map, and dynamic combinators

alexkli commented on a change in pull request #16: Add parallel, map, and dynamic combinators
URL: https://github.com/apache/incubator-openwhisk-composer/pull/16#discussion_r249162876
 
 

 ##########
 File path: conductor.js
 ##########
 @@ -83,10 +83,79 @@ class Compositions {
 // runtime code
 function main (composition) {
   const openwhisk = require('openwhisk')
+  const redis = require('redis')
+  const uuid = require('uuid').v4
   let wsk
+  let db
+  const expiration = 86400 // expire redis key after a day
+
+  function live (id) { return `composer/fork/${id}` }
+  function done (id) { return `composer/join/${id}` }
+
+  function createRedisClient (p) {
+    const client = redis.createClient(p.s.redis.uri, p.s.redis.ca ? { tls: { ca: Buffer.from(p.s.redis.ca, 'base64').toString('binary') } } : {})
+    const noop = () => { }
+    let handler = noop
+    client.on('error', error => handler(error))
+    require('redis-commands').list.forEach(f => {
+      client[`${f}Async`] = function () {
+        let failed = false
+        return new Promise((resolve, reject) => {
+          handler = error => {
+            handler = noop
+            failed = true
+            reject(error)
+          }
+          client[f](...arguments, (error, result) => {
+            handler = noop
+            return error ? reject(error) : resolve(result)
+          })
+        }).catch(error => {
+          if (failed) client.end(true)
+          return Promise.reject(error)
+        })
+      }
+    })
+    return client
+  }
 
   const isObject = obj => typeof obj === 'object' && obj !== null && !Array.isArray(obj)
 
+  function fork ({ p, node, index }, array, it) {
+    const saved = p.params // save params
+    p.s.state = index + node.return // return state
+    p.params = { value: [] } // return value
+    if (array.length === 0) return
+    if (typeof p.s.redis !== 'object' || typeof p.s.redis.uri !== 'string' || (typeof p.s.redis.ca !== 'string' && typeof p.s.redis.ca !== 'undefined')) {
+      p.params = { error: 'Parallel combinator requires a properly configured redis instance' }
+      console.error(p.params.error)
+      return
+    }
+    const stack = [{ marker: true }].concat(p.s.stack)
+    const barrierId = uuid()
+    console.log(`barrierId: ${barrierId}, spawning: ${array.length}`)
+    if (!wsk) wsk = openwhisk({ ignore_certs: true })
 
 Review comment:
   Should certs be ignored in production code?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services