You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2019/01/18 19:50:30 UTC

[GitHub] tardieu commented on a change in pull request #16: Add parallel, map, and dynamic combinators

tardieu commented on a change in pull request #16: Add parallel, map, and dynamic combinators
URL: https://github.com/apache/incubator-openwhisk-composer/pull/16#discussion_r249166093
 
 

 ##########
 File path: conductor.js
 ##########
 @@ -83,10 +83,79 @@ class Compositions {
 // runtime code
 function main (composition) {
   const openwhisk = require('openwhisk')
+  const redis = require('redis')
+  const uuid = require('uuid').v4
   let wsk
+  let db
+  const expiration = 86400 // expire redis key after a day
+
+  function live (id) { return `composer/fork/${id}` }
+  function done (id) { return `composer/join/${id}` }
+
+  function createRedisClient (p) {
+    const client = redis.createClient(p.s.redis.uri, p.s.redis.ca ? { tls: { ca: Buffer.from(p.s.redis.ca, 'base64').toString('binary') } } : {})
+    const noop = () => { }
+    let handler = noop
+    client.on('error', error => handler(error))
+    require('redis-commands').list.forEach(f => {
+      client[`${f}Async`] = function () {
+        let failed = false
+        return new Promise((resolve, reject) => {
+          handler = error => {
+            handler = noop
+            failed = true
+            reject(error)
+          }
+          client[f](...arguments, (error, result) => {
+            handler = noop
+            return error ? reject(error) : resolve(result)
+          })
+        }).catch(error => {
+          if (failed) client.end(true)
+          return Promise.reject(error)
+        })
+      }
+    })
+    return client
+  }
 
   const isObject = obj => typeof obj === 'object' && obj !== null && !Array.isArray(obj)
 
+  function fork ({ p, node, index }, array, it) {
+    const saved = p.params // save params
+    p.s.state = index + node.return // return state
+    p.params = { value: [] } // return value
+    if (array.length === 0) return
+    if (typeof p.s.redis !== 'object' || typeof p.s.redis.uri !== 'string' || (typeof p.s.redis.ca !== 'string' && typeof p.s.redis.ca !== 'undefined')) {
+      p.params = { error: 'Parallel combinator requires a properly configured redis instance' }
+      console.error(p.params.error)
+      return
+    }
+    const stack = [{ marker: true }].concat(p.s.stack)
+    const barrierId = uuid()
+    console.log(`barrierId: ${barrierId}, spawning: ${array.length}`)
+    if (!wsk) wsk = openwhisk({ ignore_certs: true })
 
 Review comment:
   This is not something new in the PR. The PR reuses the existing `composer.async` code.
   
   This is OpenWhisk talking to itself using the host and api key defined in the execution environment, which is why we initially decided that was ok. You could maybe open a separate issue where this design choice can be discussed and possibly revised?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services